blob: d8e5fabd7b295488b83c9020687085cef89f7d1a [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
Madan Jampani890bc352014-10-01 22:35:29 -07006import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -07007
Madan Jampani890bc352014-10-01 22:35:29 -07008import org.apache.felix.scr.annotations.Activate;
9import org.apache.felix.scr.annotations.Component;
10import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070011import org.apache.felix.scr.annotations.Reference;
12import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070013import org.apache.felix.scr.annotations.Service;
Madan Jampania5d0d782014-10-07 14:36:00 -070014import org.onlab.onos.cluster.ClusterService;
Madan Jampani890bc352014-10-01 22:35:29 -070015import org.onlab.onos.cluster.ControllerNode;
16import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070017import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
Madan Jampani890bc352014-10-01 22:35:29 -070018import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
19import org.onlab.onos.store.cluster.messaging.ClusterMessage;
20import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
21import org.onlab.onos.store.cluster.messaging.MessageSubject;
Madan Jampanie0ec3292014-10-20 16:10:51 -070022import org.onlab.onos.store.serializers.ClusterMessageSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070023import org.onlab.onos.store.serializers.KryoNamespaces;
Madan Jampani53e44e62014-10-07 12:39:51 -070024import org.onlab.onos.store.serializers.KryoSerializer;
Madan Jampanie0ec3292014-10-20 16:10:51 -070025import org.onlab.onos.store.serializers.MessageSubjectSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070026import org.onlab.util.KryoNamespace;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070027import org.onlab.netty.Endpoint;
28import org.onlab.netty.Message;
29import org.onlab.netty.MessageHandler;
30import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070031import org.onlab.netty.NettyMessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070032import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
Madan Jampani24f9efb2014-10-24 18:56:23 -070035import com.google.common.util.concurrent.ListenableFuture;
36
Madan Jampani890bc352014-10-01 22:35:29 -070037@Component(immediate = true)
38@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070039public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070040 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070041
42 private final Logger log = LoggerFactory.getLogger(getClass());
43
Madan Jampania5d0d782014-10-07 14:36:00 -070044 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
45 private ClusterService clusterService;
46
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070047 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070048 private MessagingService messagingService;
49
Madan Jampani53e44e62014-10-07 12:39:51 -070050 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070051 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070052 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070053 serializerPool = KryoNamespace.newBuilder()
54 .register(KryoNamespaces.API)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070055 .register(ClusterMessage.class, new ClusterMessageSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070056 .register(ClusterMembershipEvent.class)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070057 .register(byte[].class)
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070058 .register(MessageSubject.class, new MessageSubjectSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070059 .build()
60 .populate(1);
61 }
62
63 };
64
Madan Jampani890bc352014-10-01 22:35:29 -070065 @Activate
66 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070067 ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani87100932014-10-21 16:46:12 -070068 NettyMessagingService netty = new NettyMessagingService(localNode.ip().toString(), localNode.tcpPort());
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070069 // FIXME: workaround until it becomes a service.
70 try {
71 netty.activate();
72 } catch (Exception e) {
73 // TODO Auto-generated catch block
74 log.error("NettyMessagingService#activate", e);
75 }
76 messagingService = netty;
Madan Jampani890bc352014-10-01 22:35:29 -070077 log.info("Started");
78 }
79
80 @Deactivate
81 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070082 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070083 log.info("Stopped");
84 }
85
86 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -070087 public boolean broadcast(ClusterMessage message) throws IOException {
Madan Jampani890bc352014-10-01 22:35:29 -070088 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070089 final ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070090 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070091 if (!node.equals(localNode)) {
92 ok = unicast(message, node.id()) && ok;
93 }
94 }
95 return ok;
96 }
97
98 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -070099 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
Madan Jampani890bc352014-10-01 22:35:29 -0700100 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700101 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani890bc352014-10-01 22:35:29 -0700102 for (NodeId nodeId : nodes) {
103 if (!nodeId.equals(localNode.id())) {
Madan Jampani98c17602014-10-23 15:33:23 -0700104 ok = unicastUnchecked(message, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700105 }
106 }
107 return ok;
108 }
109
110 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700111 public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700112 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700113 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
114 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
115 try {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700116 messagingService.sendAsync(nodeEp,
Yuta HIGUCHI68d90472014-10-07 17:55:50 -0700117 message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700118 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700119 } catch (IOException e) {
Yuta HIGUCHIa58451f2014-10-16 15:44:59 -0700120 log.trace("Failed to send cluster message to nodeId: " + toNodeId, e);
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700121 throw e;
Madan Jampani890bc352014-10-01 22:35:29 -0700122 }
Madan Jampani890bc352014-10-01 22:35:29 -0700123 }
124
Madan Jampani98c17602014-10-23 15:33:23 -0700125 private boolean unicastUnchecked(ClusterMessage message, NodeId toNodeId) throws IOException {
126 try {
127 return unicast(message, toNodeId);
128 } catch (IOException e) {
129 return false;
130 }
131 }
132
Madan Jampani890bc352014-10-01 22:35:29 -0700133 @Override
Madan Jampani24f9efb2014-10-24 18:56:23 -0700134 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700135 ControllerNode node = clusterService.getNode(toNodeId);
136 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
137 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
138 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700139 return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700140
141 } catch (IOException e) {
142 log.error("Failed interaction with remote nodeId: " + toNodeId, e);
143 throw e;
144 }
145 }
146
147 @Override
Madan Jampani890bc352014-10-01 22:35:29 -0700148 public void addSubscriber(MessageSubject subject,
149 ClusterMessageHandler subscriber) {
150 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
151 }
152
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700153 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700154
155 private final ClusterMessageHandler handler;
156
157 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
158 this.handler = handler;
159 }
160
161 @Override
162 public void handle(Message message) {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700163 try {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700164 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
Madan Jampani8a895092014-10-17 16:55:50 -0700165 handler.handle(new InternalClusterMessage(clusterMessage, message));
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700166 } catch (Exception e) {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700167 log.error("Exception caught during ClusterMessageHandler", e);
168 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700169 }
Madan Jampani890bc352014-10-01 22:35:29 -0700170 }
171 }
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700172
Madan Jampani8a895092014-10-17 16:55:50 -0700173 public static final class InternalClusterMessage extends ClusterMessage {
174
175 private final Message rawMessage;
176
177 public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
178 super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
179 this.rawMessage = rawMessage;
180 }
181
182 @Override
183 public void respond(byte[] response) throws IOException {
184 rawMessage.respond(response);
185 }
186 }
Madan Jampani24f9efb2014-10-24 18:56:23 -0700187}