blob: c72fae8984c0a712d71517d2bb14bc3360a9abc3 [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
Madan Jampani890bc352014-10-01 22:35:29 -07006import java.util.Set;
7import java.util.Timer;
8import java.util.TimerTask;
Madan Jampani58819b42014-10-09 13:48:51 -07009
Madan Jampani890bc352014-10-01 22:35:29 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070013import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070015import org.apache.felix.scr.annotations.Service;
Madan Jampania5d0d782014-10-07 14:36:00 -070016import org.onlab.onos.cluster.ClusterService;
Madan Jampani890bc352014-10-01 22:35:29 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070019import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
20import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
Madan Jampani890bc352014-10-01 22:35:29 -070021import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
22import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
23import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
24import org.onlab.onos.store.cluster.messaging.ClusterMessage;
25import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
26import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampani53e44e62014-10-07 12:39:51 -070027import org.onlab.onos.store.serializers.KryoPoolUtil;
28import org.onlab.onos.store.serializers.KryoSerializer;
29import org.onlab.util.KryoPool;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070030import org.onlab.netty.Endpoint;
31import org.onlab.netty.Message;
32import org.onlab.netty.MessageHandler;
33import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070034import org.onlab.netty.NettyMessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070035import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
38@Component(immediate = true)
39@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070040public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070041 implements ClusterCommunicationService, ClusterCommunicationAdminService {
42
43 private final Logger log = LoggerFactory.getLogger(getClass());
44
45 private ControllerNode localNode;
Madan Jampania5d0d782014-10-07 14:36:00 -070046
47 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
48 private ClusterService clusterService;
49
Madan Jampani890bc352014-10-01 22:35:29 -070050 private ClusterNodesDelegate nodesDelegate;
Madan Jampani890bc352014-10-01 22:35:29 -070051 private final Timer timer = new Timer("onos-controller-heatbeats");
52 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
53
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070054 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070055 private MessagingService messagingService;
56
Madan Jampani53e44e62014-10-07 12:39:51 -070057 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070058 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070059 protected void setupKryoPool() {
60 serializerPool = KryoPool.newBuilder()
61 .register(KryoPoolUtil.API)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070062 .register(ClusterMessage.class, new ClusterMessageSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070063 .register(ClusterMembershipEvent.class)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070064 .register(byte[].class)
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070065 .register(MessageSubject.class, new MessageSubjectSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070066 .build()
67 .populate(1);
68 }
69
70 };
71
Madan Jampani890bc352014-10-01 22:35:29 -070072 @Activate
73 public void activate() {
Madan Jampania5d0d782014-10-07 14:36:00 -070074 localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070075 NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
76 // FIXME: workaround until it becomes a service.
77 try {
78 netty.activate();
79 } catch (Exception e) {
80 // TODO Auto-generated catch block
81 log.error("NettyMessagingService#activate", e);
82 }
83 messagingService = netty;
Madan Jampani890bc352014-10-01 22:35:29 -070084 log.info("Started");
85 }
86
87 @Deactivate
88 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070089 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070090 log.info("Stopped");
91 }
92
93 @Override
94 public boolean broadcast(ClusterMessage message) {
95 boolean ok = true;
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070096 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070097 if (!node.equals(localNode)) {
98 ok = unicast(message, node.id()) && ok;
99 }
100 }
101 return ok;
102 }
103
104 @Override
105 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
106 boolean ok = true;
107 for (NodeId nodeId : nodes) {
108 if (!nodeId.equals(localNode.id())) {
109 ok = unicast(message, nodeId) && ok;
110 }
111 }
112 return ok;
113 }
114
115 @Override
116 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700117 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700118 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
119 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
120 try {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700121 messagingService.sendAsync(nodeEp,
Yuta HIGUCHI68d90472014-10-07 17:55:50 -0700122 message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700123 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700124 } catch (IOException e) {
Madan Jampani890bc352014-10-01 22:35:29 -0700125 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
126 }
127
128 return false;
129 }
130
131 @Override
132 public void addSubscriber(MessageSubject subject,
133 ClusterMessageHandler subscriber) {
134 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
135 }
136
137 @Override
138 public void initialize(ControllerNode localNode,
139 ClusterNodesDelegate delegate) {
140 this.localNode = localNode;
141 this.nodesDelegate = delegate;
142 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
143 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
144 }
145
146 @Override
147 public void addNode(ControllerNode node) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700148 //members.put(node.id(), node);
Madan Jampani890bc352014-10-01 22:35:29 -0700149 }
150
151 @Override
152 public void removeNode(ControllerNode node) {
153 broadcast(new ClusterMessage(
154 localNode.id(),
155 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700156 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700157 //members.remove(node.id());
Madan Jampani890bc352014-10-01 22:35:29 -0700158 }
159
160 // Sends a heart beat to all peers.
161 private class KeepAlive extends TimerTask {
162
163 @Override
164 public void run() {
165 broadcast(new ClusterMessage(
166 localNode.id(),
167 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700168 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
Madan Jampani890bc352014-10-01 22:35:29 -0700169 }
170 }
171
172 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
173
174 @Override
175 public void handle(ClusterMessage message) {
176
Madan Jampani53e44e62014-10-07 12:39:51 -0700177 ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
Madan Jampani890bc352014-10-01 22:35:29 -0700178 ControllerNode node = event.node();
179 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
180 log.info("Node {} sent a hearbeat", node.id());
181 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
182 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
183 log.info("Node {} is leaving", node.id());
184 nodesDelegate.nodeRemoved(node.id());
185 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
186 log.info("Node {} is unreachable", node.id());
187 nodesDelegate.nodeVanished(node.id());
188 }
189 }
190 }
191
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700192 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700193
194 private final ClusterMessageHandler handler;
195
196 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
197 this.handler = handler;
198 }
199
200 @Override
201 public void handle(Message message) {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700202 try {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700203 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700204 handler.handle(clusterMessage);
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700205 } catch (Exception e) {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700206 log.error("Exception caught during ClusterMessageHandler", e);
207 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700208 }
Madan Jampani890bc352014-10-01 22:35:29 -0700209 }
210 }
211}