blob: 710d750b928792ad5b61d96f85a1d0a64941470f [file] [log] [blame]
Madan Jampani890bc352014-10-01 22:35:29 -07001package org.onlab.onos.store.cluster.messaging.impl;
2
3import static com.google.common.base.Preconditions.checkArgument;
4
5import java.io.IOException;
Madan Jampani890bc352014-10-01 22:35:29 -07006import java.util.Set;
Madan Jampani4a9cb6d2014-10-17 10:48:50 -07007import java.util.concurrent.TimeUnit;
8import java.util.concurrent.TimeoutException;
9
Madan Jampani890bc352014-10-01 22:35:29 -070010import org.apache.felix.scr.annotations.Activate;
11import org.apache.felix.scr.annotations.Component;
12import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070013import org.apache.felix.scr.annotations.Reference;
14import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070015import org.apache.felix.scr.annotations.Service;
Madan Jampania5d0d782014-10-07 14:36:00 -070016import org.onlab.onos.cluster.ClusterService;
Madan Jampani890bc352014-10-01 22:35:29 -070017import org.onlab.onos.cluster.ControllerNode;
18import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHIc057c632014-10-06 18:38:14 -070019import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
Madan Jampani890bc352014-10-01 22:35:29 -070020import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
21import org.onlab.onos.store.cluster.messaging.ClusterMessage;
22import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani4a9cb6d2014-10-17 10:48:50 -070023import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
Madan Jampani890bc352014-10-01 22:35:29 -070024import org.onlab.onos.store.cluster.messaging.MessageSubject;
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070025import org.onlab.onos.store.serializers.ClusterMessageSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070026import org.onlab.onos.store.serializers.KryoPoolUtil;
27import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070028import org.onlab.onos.store.serializers.MessageSubjectSerializer;
Madan Jampani53e44e62014-10-07 12:39:51 -070029import org.onlab.util.KryoPool;
Madan Jampanic9ed9be2014-10-02 16:13:11 -070030import org.onlab.netty.Endpoint;
31import org.onlab.netty.Message;
32import org.onlab.netty.MessageHandler;
33import org.onlab.netty.MessagingService;
Madan Jampanida1a6b02014-10-07 14:16:15 -070034import org.onlab.netty.NettyMessagingService;
Madan Jampani4a9cb6d2014-10-17 10:48:50 -070035import org.onlab.netty.Response;
Madan Jampani890bc352014-10-01 22:35:29 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
39@Component(immediate = true)
40@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070041public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070042 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070043
44 private final Logger log = LoggerFactory.getLogger(getClass());
45
Madan Jampania5d0d782014-10-07 14:36:00 -070046 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
47 private ClusterService clusterService;
48
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070049 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070050 private MessagingService messagingService;
51
Madan Jampani53e44e62014-10-07 12:39:51 -070052 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070053 @Override
Madan Jampani53e44e62014-10-07 12:39:51 -070054 protected void setupKryoPool() {
55 serializerPool = KryoPool.newBuilder()
56 .register(KryoPoolUtil.API)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070057 .register(ClusterMessage.class, new ClusterMessageSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070058 .register(ClusterMembershipEvent.class)
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070059 .register(byte[].class)
Yuta HIGUCHI3e0b6512014-10-07 17:43:56 -070060 .register(MessageSubject.class, new MessageSubjectSerializer())
Madan Jampani53e44e62014-10-07 12:39:51 -070061 .build()
62 .populate(1);
63 }
64
65 };
66
Madan Jampani890bc352014-10-01 22:35:29 -070067 @Activate
68 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070069 ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070070 NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
71 // FIXME: workaround until it becomes a service.
72 try {
73 netty.activate();
74 } catch (Exception e) {
75 // TODO Auto-generated catch block
76 log.error("NettyMessagingService#activate", e);
77 }
78 messagingService = netty;
Madan Jampani890bc352014-10-01 22:35:29 -070079 log.info("Started");
80 }
81
82 @Deactivate
83 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070084 // TODO: cleanup messageingService if needed.
Madan Jampani890bc352014-10-01 22:35:29 -070085 log.info("Stopped");
86 }
87
88 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -070089 public boolean broadcast(ClusterMessage message) throws IOException {
Madan Jampani890bc352014-10-01 22:35:29 -070090 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070091 final ControllerNode localNode = clusterService.getLocalNode();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070092 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070093 if (!node.equals(localNode)) {
94 ok = unicast(message, node.id()) && ok;
95 }
96 }
97 return ok;
98 }
99
100 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700101 public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
Madan Jampani890bc352014-10-01 22:35:29 -0700102 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700103 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani890bc352014-10-01 22:35:29 -0700104 for (NodeId nodeId : nodes) {
105 if (!nodeId.equals(localNode.id())) {
106 ok = unicast(message, nodeId) && ok;
107 }
108 }
109 return ok;
110 }
111
112 @Override
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700113 public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700114 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700115 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
116 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
117 try {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700118 messagingService.sendAsync(nodeEp,
Yuta HIGUCHI68d90472014-10-07 17:55:50 -0700119 message.subject().value(), SERIALIZER.encode(message));
Madan Jampani890bc352014-10-01 22:35:29 -0700120 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700121 } catch (IOException e) {
Yuta HIGUCHIa58451f2014-10-16 15:44:59 -0700122 log.trace("Failed to send cluster message to nodeId: " + toNodeId, e);
Yuta HIGUCHI9cff53b2014-10-13 15:21:16 -0700123 throw e;
Madan Jampani890bc352014-10-01 22:35:29 -0700124 }
Madan Jampani890bc352014-10-01 22:35:29 -0700125 }
126
127 @Override
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700128 public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
129 ControllerNode node = clusterService.getNode(toNodeId);
130 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
131 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
132 try {
133 Response responseFuture =
134 messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
135 return new InternalClusterMessageResponse(toNodeId, responseFuture);
136
137 } catch (IOException e) {
138 log.error("Failed interaction with remote nodeId: " + toNodeId, e);
139 throw e;
140 }
141 }
142
143 @Override
Madan Jampani890bc352014-10-01 22:35:29 -0700144 public void addSubscriber(MessageSubject subject,
145 ClusterMessageHandler subscriber) {
146 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
147 }
148
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700149 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700150
151 private final ClusterMessageHandler handler;
152
153 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
154 this.handler = handler;
155 }
156
157 @Override
158 public void handle(Message message) {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700159 try {
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700160 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700161 handler.handle(clusterMessage);
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700162 } catch (Exception e) {
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700163 log.error("Exception caught during ClusterMessageHandler", e);
164 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700165 }
Madan Jampani890bc352014-10-01 22:35:29 -0700166 }
167 }
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700168
169 private static final class InternalClusterMessageResponse implements ClusterMessageResponse {
170
171 private final NodeId sender;
172 private final Response responseFuture;
173
174 public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
175 this.sender = sender;
176 this.responseFuture = responseFuture;
177 }
178 @Override
179 public NodeId sender() {
180 return sender;
181 }
182
183 @Override
184 public byte[] get(long timeout, TimeUnit timeunit)
185 throws TimeoutException {
186 return responseFuture.get(timeout, timeunit);
187 }
188
189 @Override
190 public byte[] get(long timeout) throws InterruptedException {
191 return responseFuture.get();
192 }
193 }
Madan Jampani890bc352014-10-01 22:35:29 -0700194}