blob: 3a9fd8d6e3f3675cbd849b6aa68491171a3c419e [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
6import java.util.HashMap;
7import java.util.Map;
8import java.util.Set;
9import java.util.Timer;
10import java.util.TimerTask;
11
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070015import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070017import org.apache.felix.scr.annotations.Service;
Madan Jampania5d0d782014-10-07 14:36:00 -070018import org.onlab.onos.cluster.ClusterService;
Madan Jampani890bc352014-10-01 22:35:29 -070019import org.onlab.onos.cluster.ControllerNode;
20import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070021import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
22import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
24import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
25import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
26import org.onlab.onos.store.cluster.messaging.ClusterMessage;
27import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
28import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070029import org.onlab.onos.store.serializers.ClusterMessageSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070030import org.onlab.onos.store.serializers.KryoPoolUtil;
31import org.onlab.onos.store.serializers.KryoSerializer;
32import org.onlab.util.KryoPool;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070033import org.onlab.netty.Endpoint;
34import org.onlab.netty.Message;
35import org.onlab.netty.MessageHandler;
36import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070037import org.onlab.netty.NettyMessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070038import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
41@Component(immediate = true)
42@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070043public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070044 implements ClusterCommunicationService, ClusterCommunicationAdminService {
45
46 private final Logger log = LoggerFactory.getLogger(getClass());
47
48 private ControllerNode localNode;
Madan Jampania5d0d782014-10-07 14:36:00 -070049
50 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
51 private ClusterService clusterService;
52
Madan Jampani890bc352014-10-01 22:35:29 -070053 private ClusterNodesDelegate nodesDelegate;
Madan Jampani890bc352014-10-01 22:35:29 -070054 private final Timer timer = new Timer("onos-controller-heatbeats");
55 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
56
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070057 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070058 private MessagingService messagingService;
59
Madan Jampani53e44e62014-10-07 12:39:51 -070060 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070061 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070062 protected void setupKryoPool() {
63 serializerPool = KryoPool.newBuilder()
64 .register(KryoPoolUtil.API)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070065 .register(ClusterMessage.class, new ClusterMessageSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070066 .register(ClusterMembershipEvent.class)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070067 .register(byte[].class)
68 .register(MessageSubject.class)
Madan Jampani53e44e62014-10-07 12:39:51 -070069 .build()
70 .populate(1);
71 }
72
73 };
74
Madan Jampani890bc352014-10-01 22:35:29 -070075 @Activate
76 public void activate() {
Madan Jampania5d0d782014-10-07 14:36:00 -070077 localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070078 NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
79 // FIXME: workaround until it becomes a service.
80 try {
81 netty.activate();
82 } catch (Exception e) {
83 // TODO Auto-generated catch block
84 log.error("NettyMessagingService#activate", e);
85 }
86 messagingService = netty;
Madan Jampani890bc352014-10-01 22:35:29 -070087 log.info("Started");
88 }
89
90 @Deactivate
91 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070092 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070093 log.info("Stopped");
94 }
95
96 @Override
97 public boolean broadcast(ClusterMessage message) {
98 boolean ok = true;
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070099 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -0700100 if (!node.equals(localNode)) {
101 ok = unicast(message, node.id()) && ok;
102 }
103 }
104 return ok;
105 }
106
107 @Override
108 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
109 boolean ok = true;
110 for (NodeId nodeId : nodes) {
111 if (!nodeId.equals(localNode.id())) {
112 ok = unicast(message, nodeId) && ok;
113 }
114 }
115 return ok;
116 }
117
118 @Override
119 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700120 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700121 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
122 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
123 try {
Madan Jampani53e44e62014-10-07 12:39:51 -0700124 messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700125 return true;
126 } catch (IOException e) {
127 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
128 }
129
130 return false;
131 }
132
133 @Override
134 public void addSubscriber(MessageSubject subject,
135 ClusterMessageHandler subscriber) {
136 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
137 }
138
139 @Override
140 public void initialize(ControllerNode localNode,
141 ClusterNodesDelegate delegate) {
142 this.localNode = localNode;
143 this.nodesDelegate = delegate;
144 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
145 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
146 }
147
148 @Override
149 public void addNode(ControllerNode node) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700150 //members.put(node.id(), node);
Madan Jampani890bc352014-10-01 22:35:29 -0700151 }
152
153 @Override
154 public void removeNode(ControllerNode node) {
155 broadcast(new ClusterMessage(
156 localNode.id(),
157 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700158 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700159 //members.remove(node.id());
Madan Jampani890bc352014-10-01 22:35:29 -0700160 }
161
162 // Sends a heart beat to all peers.
163 private class KeepAlive extends TimerTask {
164
165 @Override
166 public void run() {
167 broadcast(new ClusterMessage(
168 localNode.id(),
169 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700170 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
Madan Jampani890bc352014-10-01 22:35:29 -0700171 }
172 }
173
174 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
175
176 @Override
177 public void handle(ClusterMessage message) {
178
Madan Jampani53e44e62014-10-07 12:39:51 -0700179 ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
Madan Jampani890bc352014-10-01 22:35:29 -0700180 ControllerNode node = event.node();
181 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
182 log.info("Node {} sent a hearbeat", node.id());
183 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
184 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
185 log.info("Node {} is leaving", node.id());
186 nodesDelegate.nodeRemoved(node.id());
187 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
188 log.info("Node {} is unreachable", node.id());
189 nodesDelegate.nodeVanished(node.id());
190 }
191 }
192 }
193
194 private static class InternalClusterMessageHandler implements MessageHandler {
195
196 private final ClusterMessageHandler handler;
197
198 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
199 this.handler = handler;
200 }
201
202 @Override
203 public void handle(Message message) {
Madan Jampani53e44e62014-10-07 12:39:51 -0700204 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
205 handler.handle(clusterMessage);
Madan Jampani890bc352014-10-01 22:35:29 -0700206 }
207 }
208}