blob: 1b118732c89adc6b9e3285bf009bb2301e586f1e [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
Madan Jampani890bc352014-10-01 22:35:29 -07006import java.util.Set;
7import java.util.Timer;
8import java.util.TimerTask;
Madan Jampani890bc352014-10-01 22:35:29 -07009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070012import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070014import org.apache.felix.scr.annotations.Service;
Madan Jampania5d0d782014-10-07 14:36:00 -070015import org.onlab.onos.cluster.ClusterService;
Madan Jampani890bc352014-10-01 22:35:29 -070016import org.onlab.onos.cluster.ControllerNode;
17import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070018import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
19import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
Madan Jampani890bc352014-10-01 22:35:29 -070020import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
22import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
25import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070026import org.onlab.onos.store.serializers.ClusterMessageSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070027import org.onlab.onos.store.serializers.KryoPoolUtil;
28import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070029import org.onlab.onos.store.serializers.MessageSubjectSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070030import org.onlab.util.KryoPool;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070031import org.onlab.netty.Endpoint;
32import org.onlab.netty.Message;
33import org.onlab.netty.MessageHandler;
34import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070035import org.onlab.netty.NettyMessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
39@Component(immediate = true)
40@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070041public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070042 implements ClusterCommunicationService, ClusterCommunicationAdminService {
43
44 private final Logger log = LoggerFactory.getLogger(getClass());
45
46 private ControllerNode localNode;
Madan Jampania5d0d782014-10-07 14:36:00 -070047
48 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
49 private ClusterService clusterService;
50
Madan Jampani890bc352014-10-01 22:35:29 -070051 private ClusterNodesDelegate nodesDelegate;
Madan Jampani890bc352014-10-01 22:35:29 -070052 private final Timer timer = new Timer("onos-controller-heatbeats");
53 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
54
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070055 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070056 private MessagingService messagingService;
57
Madan Jampani53e44e62014-10-07 12:39:51 -070058 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070059 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070060 protected void setupKryoPool() {
61 serializerPool = KryoPool.newBuilder()
62 .register(KryoPoolUtil.API)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070063 .register(ClusterMessage.class, new ClusterMessageSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070064 .register(ClusterMembershipEvent.class)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070065 .register(byte[].class)
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070066 .register(MessageSubject.class, new MessageSubjectSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070067 .build()
68 .populate(1);
69 }
70
71 };
72
Madan Jampani890bc352014-10-01 22:35:29 -070073 @Activate
74 public void activate() {
Madan Jampania5d0d782014-10-07 14:36:00 -070075 localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070076 NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
77 // FIXME: workaround until it becomes a service.
78 try {
79 netty.activate();
80 } catch (Exception e) {
81 // TODO Auto-generated catch block
82 log.error("NettyMessagingService#activate", e);
83 }
84 messagingService = netty;
Madan Jampani890bc352014-10-01 22:35:29 -070085 log.info("Started");
86 }
87
88 @Deactivate
89 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070090 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070091 log.info("Stopped");
92 }
93
94 @Override
95 public boolean broadcast(ClusterMessage message) {
96 boolean ok = true;
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070097 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070098 if (!node.equals(localNode)) {
99 ok = unicast(message, node.id()) && ok;
100 }
101 }
102 return ok;
103 }
104
105 @Override
106 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
107 boolean ok = true;
108 for (NodeId nodeId : nodes) {
109 if (!nodeId.equals(localNode.id())) {
110 ok = unicast(message, nodeId) && ok;
111 }
112 }
113 return ok;
114 }
115
116 @Override
117 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700118 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700119 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
120 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
121 try {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700122 messagingService.sendAsync(nodeEp,
Yuta HIGUCHI68d90472014-10-07 17:55:50 -0700123 message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700124 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700125 } catch (IOException e) {
Madan Jampani890bc352014-10-01 22:35:29 -0700126 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
127 }
128
129 return false;
130 }
131
132 @Override
133 public void addSubscriber(MessageSubject subject,
134 ClusterMessageHandler subscriber) {
135 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
136 }
137
138 @Override
139 public void initialize(ControllerNode localNode,
140 ClusterNodesDelegate delegate) {
141 this.localNode = localNode;
142 this.nodesDelegate = delegate;
143 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
144 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
145 }
146
147 @Override
148 public void addNode(ControllerNode node) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700149 //members.put(node.id(), node);
Madan Jampani890bc352014-10-01 22:35:29 -0700150 }
151
152 @Override
153 public void removeNode(ControllerNode node) {
154 broadcast(new ClusterMessage(
155 localNode.id(),
156 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700157 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700158 //members.remove(node.id());
Madan Jampani890bc352014-10-01 22:35:29 -0700159 }
160
161 // Sends a heart beat to all peers.
162 private class KeepAlive extends TimerTask {
163
164 @Override
165 public void run() {
166 broadcast(new ClusterMessage(
167 localNode.id(),
168 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700169 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
Madan Jampani890bc352014-10-01 22:35:29 -0700170 }
171 }
172
173 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
174
175 @Override
176 public void handle(ClusterMessage message) {
177
Madan Jampani53e44e62014-10-07 12:39:51 -0700178 ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
Madan Jampani890bc352014-10-01 22:35:29 -0700179 ControllerNode node = event.node();
180 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
181 log.info("Node {} sent a hearbeat", node.id());
182 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
183 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
184 log.info("Node {} is leaving", node.id());
185 nodesDelegate.nodeRemoved(node.id());
186 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
187 log.info("Node {} is unreachable", node.id());
188 nodesDelegate.nodeVanished(node.id());
189 }
190 }
191 }
192
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700193 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700194
195 private final ClusterMessageHandler handler;
196
197 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
198 this.handler = handler;
199 }
200
201 @Override
202 public void handle(Message message) {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700203 try {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700204 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700205 handler.handle(clusterMessage);
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700206 } catch (Exception e) {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700207 log.error("Exception caught during ClusterMessageHandler", e);
208 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700209 }
Madan Jampani890bc352014-10-01 22:35:29 -0700210 }
211 }
212}