blob: c732c16c55a0315d3b6c39a0c2b4f940c62d2d4a [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
Madan Jampani890bc352014-10-01 22:35:29 -07006import java.util.Set;
7import java.util.Timer;
8import java.util.TimerTask;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -07009import java.util.concurrent.TimeUnit;
10import java.util.concurrent.TimeoutException;
Madan Jampani890bc352014-10-01 22:35:29 -070011
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070015import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070017import org.apache.felix.scr.annotations.Service;
Madan Jampania5d0d782014-10-07 14:36:00 -070018import org.onlab.onos.cluster.ClusterService;
Madan Jampani890bc352014-10-01 22:35:29 -070019import org.onlab.onos.cluster.ControllerNode;
20import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070021import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
22import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
24import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
25import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
26import org.onlab.onos.store.cluster.messaging.ClusterMessage;
27import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
28import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070029import org.onlab.onos.store.serializers.ClusterMessageSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070030import org.onlab.onos.store.serializers.KryoPoolUtil;
31import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070032import org.onlab.onos.store.serializers.MessageSubjectSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070033import org.onlab.util.KryoPool;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070034import org.onlab.netty.Endpoint;
35import org.onlab.netty.Message;
36import org.onlab.netty.MessageHandler;
37import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070038import org.onlab.netty.NettyMessagingService;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -070039import org.onlab.netty.Response;
Madan Jampani890bc352014-10-01 22:35:29 -070040import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
43@Component(immediate = true)
44@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070045public class ClusterCommunicationManager
Madan Jampani890bc352014-10-01 22:35:29 -070046 implements ClusterCommunicationService, ClusterCommunicationAdminService {
47
48 private final Logger log = LoggerFactory.getLogger(getClass());
49
50 private ControllerNode localNode;
Madan Jampania5d0d782014-10-07 14:36:00 -070051
52 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 private ClusterService clusterService;
54
Madan Jampani890bc352014-10-01 22:35:29 -070055 private ClusterNodesDelegate nodesDelegate;
Madan Jampani890bc352014-10-01 22:35:29 -070056 private final Timer timer = new Timer("onos-controller-heatbeats");
57 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
58
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070059 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070060 private MessagingService messagingService;
61
Madan Jampani53e44e62014-10-07 12:39:51 -070062 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070063 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070064 protected void setupKryoPool() {
65 serializerPool = KryoPool.newBuilder()
66 .register(KryoPoolUtil.API)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070067 .register(ClusterMessage.class, new ClusterMessageSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070068 .register(ClusterMembershipEvent.class)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070069 .register(byte[].class)
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070070 .register(MessageSubject.class, new MessageSubjectSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070071 .build()
72 .populate(1);
73 }
74
75 };
76
Madan Jampani890bc352014-10-01 22:35:29 -070077 @Activate
78 public void activate() {
Madan Jampania5d0d782014-10-07 14:36:00 -070079 localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070080 NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
81 // FIXME: workaround until it becomes a service.
82 try {
83 netty.activate();
84 } catch (Exception e) {
85 // TODO Auto-generated catch block
86 log.error("NettyMessagingService#activate", e);
87 }
88 messagingService = netty;
Madan Jampani890bc352014-10-01 22:35:29 -070089 log.info("Started");
90 }
91
92 @Deactivate
93 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070094 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070095 log.info("Stopped");
96 }
97
98 @Override
99 public boolean broadcast(ClusterMessage message) {
100 boolean ok = true;
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700101 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -0700102 if (!node.equals(localNode)) {
103 ok = unicast(message, node.id()) && ok;
104 }
105 }
106 return ok;
107 }
108
109 @Override
110 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
111 boolean ok = true;
112 for (NodeId nodeId : nodes) {
113 if (!nodeId.equals(localNode.id())) {
114 ok = unicast(message, nodeId) && ok;
115 }
116 }
117 return ok;
118 }
119
120 @Override
121 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700122 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700123 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
124 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
125 try {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700126 log.info("sending...");
Yuta HIGUCHI68d90472014-10-07 17:55:50 -0700127 Response resp = messagingService.sendAndReceive(nodeEp,
128 message.subject().value(), SERIALIZER.encode(message));
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700129 resp.get(1, TimeUnit.SECONDS);
130 log.info("sent...");
Madan Jampani890bc352014-10-01 22:35:29 -0700131 return true;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700132 } catch (IOException | TimeoutException e) {
Madan Jampani890bc352014-10-01 22:35:29 -0700133 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
134 }
135
136 return false;
137 }
138
139 @Override
140 public void addSubscriber(MessageSubject subject,
141 ClusterMessageHandler subscriber) {
142 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
143 }
144
145 @Override
146 public void initialize(ControllerNode localNode,
147 ClusterNodesDelegate delegate) {
148 this.localNode = localNode;
149 this.nodesDelegate = delegate;
150 this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
151 timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
152 }
153
154 @Override
155 public void addNode(ControllerNode node) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700156 //members.put(node.id(), node);
Madan Jampani890bc352014-10-01 22:35:29 -0700157 }
158
159 @Override
160 public void removeNode(ControllerNode node) {
161 broadcast(new ClusterMessage(
162 localNode.id(),
163 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700164 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700165 //members.remove(node.id());
Madan Jampani890bc352014-10-01 22:35:29 -0700166 }
167
168 // Sends a heart beat to all peers.
169 private class KeepAlive extends TimerTask {
170
171 @Override
172 public void run() {
173 broadcast(new ClusterMessage(
174 localNode.id(),
175 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
Madan Jampani53e44e62014-10-07 12:39:51 -0700176 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
Madan Jampani890bc352014-10-01 22:35:29 -0700177 }
178 }
179
180 private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
181
182 @Override
183 public void handle(ClusterMessage message) {
184
Madan Jampani53e44e62014-10-07 12:39:51 -0700185 ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
Madan Jampani890bc352014-10-01 22:35:29 -0700186 ControllerNode node = event.node();
187 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
188 log.info("Node {} sent a hearbeat", node.id());
189 nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
190 } else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
191 log.info("Node {} is leaving", node.id());
192 nodesDelegate.nodeRemoved(node.id());
193 } else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
194 log.info("Node {} is unreachable", node.id());
195 nodesDelegate.nodeVanished(node.id());
196 }
197 }
198 }
199
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700200 // FIXME: revert static
201 private class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700202
203 private final ClusterMessageHandler handler;
204
205 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
206 this.handler = handler;
207 }
208
209 @Override
210 public void handle(Message message) {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700211 // FIXME: remove me
212 log.info("InternalClusterMessageHandler.handle({})", message);
213 try {
214 log.info("before decode");
215 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
216 log.info("Subject:({}), Sender:({})", clusterMessage.subject(), clusterMessage.sender());
217 handler.handle(clusterMessage);
218 message.respond("ACK".getBytes());
219 } catch (Exception e) {
220 // TODO Auto-generated catch block
221 log.error("failed", e);
222 }
Madan Jampani890bc352014-10-01 22:35:29 -0700223 }
224 }
225}