blob: c7852aed4725867e7a4220ed11212a06e487903a [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
Madan Jampani890bc352014-10-01 22:35:29 -07006import java.util.Set;
Madan Jampani890bc352014-10-01 22:35:29 -07007import org.apache.felix.scr.annotations.Activate;
8import org.apache.felix.scr.annotations.Component;
9import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070010import org.apache.felix.scr.annotations.Reference;
11import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070012import org.apache.felix.scr.annotations.Service;
Madan Jampania5d0d782014-10-07 14:36:00 -070013import org.onlab.onos.cluster.ClusterService;
Madan Jampani890bc352014-10-01 22:35:29 -070014import org.onlab.onos.cluster.ControllerNode;
15import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070016import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
Madan Jampani890bc352014-10-01 22:35:29 -070017import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
18import org.onlab.onos.store.cluster.messaging.ClusterMessage;
19import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
20import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070021import org.onlab.onos.store.serializers.ClusterMessageSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070022import org.onlab.onos.store.serializers.KryoPoolUtil;
23import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070024import org.onlab.onos.store.serializers.MessageSubjectSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070025import org.onlab.util.KryoPool;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070026import org.onlab.netty.Endpoint;
27import org.onlab.netty.Message;
28import org.onlab.netty.MessageHandler;
29import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070030import org.onlab.netty.NettyMessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070031import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34@Component(immediate = true)
35@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070036public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070037 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070038
39 private final Logger log = LoggerFactory.getLogger(getClass());
40
Madan Jampania5d0d782014-10-07 14:36:00 -070041 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
42 private ClusterService clusterService;
43
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070044 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070045 private MessagingService messagingService;
46
Madan Jampani53e44e62014-10-07 12:39:51 -070047 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070048 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070049 protected void setupKryoPool() {
50 serializerPool = KryoPool.newBuilder()
51 .register(KryoPoolUtil.API)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070052 .register(ClusterMessage.class, new ClusterMessageSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070053 .register(ClusterMembershipEvent.class)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070054 .register(byte[].class)
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070055 .register(MessageSubject.class, new MessageSubjectSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070056 .build()
57 .populate(1);
58 }
59
60 };
61
Madan Jampani890bc352014-10-01 22:35:29 -070062 @Activate
63 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070064 ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070065 NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
66 // FIXME: workaround until it becomes a service.
67 try {
68 netty.activate();
69 } catch (Exception e) {
70 // TODO Auto-generated catch block
71 log.error("NettyMessagingService#activate", e);
72 }
73 messagingService = netty;
Madan Jampani890bc352014-10-01 22:35:29 -070074 log.info("Started");
75 }
76
77 @Deactivate
78 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070079 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070080 log.info("Stopped");
81 }
82
83 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -070084 public boolean broadcast(ClusterMessage message) throws IOException {
Madan Jampani890bc352014-10-01 22:35:29 -070085 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070086 final ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070087 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070088 if (!node.equals(localNode)) {
89 ok = unicast(message, node.id()) && ok;
90 }
91 }
92 return ok;
93 }
94
95 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -070096 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
Madan Jampani890bc352014-10-01 22:35:29 -070097 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070098 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani890bc352014-10-01 22:35:29 -070099 for (NodeId nodeId : nodes) {
100 if (!nodeId.equals(localNode.id())) {
101 ok = unicast(message, nodeId) && ok;
102 }
103 }
104 return ok;
105 }
106
107 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700108 public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700109 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700110 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
111 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
112 try {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700113 messagingService.sendAsync(nodeEp,
Yuta HIGUCHI68d90472014-10-07 17:55:50 -0700114 message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700115 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700116 } catch (IOException e) {
Madan Jampani890bc352014-10-01 22:35:29 -0700117 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700118 throw e;
Madan Jampani890bc352014-10-01 22:35:29 -0700119 }
Madan Jampani890bc352014-10-01 22:35:29 -0700120 }
121
122 @Override
123 public void addSubscriber(MessageSubject subject,
124 ClusterMessageHandler subscriber) {
125 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
126 }
127
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700128 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700129
130 private final ClusterMessageHandler handler;
131
132 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
133 this.handler = handler;
134 }
135
136 @Override
137 public void handle(Message message) {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700138 try {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700139 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700140 handler.handle(clusterMessage);
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700141 } catch (Exception e) {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700142 log.error("Exception caught during ClusterMessageHandler", e);
143 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700144 }
Madan Jampani890bc352014-10-01 22:35:29 -0700145 }
146 }
147}