blob: 1a4512d3dfbf1fff7279b46c649d36dcd08c7963 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Madan Jampani890bc352014-10-01 22:35:29 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.apache.felix.scr.annotations.Service;
Thomas Vachuskab093c912015-04-20 10:28:26 -070024import org.onlab.netty.NettyMessagingManager;
25import org.onlab.nio.service.IOLoopMessagingManager;
Brian O'Connorabafb502014-12-02 22:26:20 -080026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.ControllerNode;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
30import org.onosproject.store.cluster.messaging.ClusterMessage;
31import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanic26eede2015-04-16 11:42:16 -070032import org.onosproject.store.cluster.messaging.Endpoint;
Brian O'Connorabafb502014-12-02 22:26:20 -080033import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic26eede2015-04-16 11:42:16 -070034import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070035import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Madan Jampani2bfa94c2015-04-11 05:03:49 -070038import com.google.common.base.Objects;
39import com.google.common.util.concurrent.ListenableFuture;
40import com.google.common.util.concurrent.SettableFuture;
41
Jonathan Hart7d656f42015-01-27 14:07:23 -080042import java.io.IOException;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070043import java.util.Set;
44import java.util.concurrent.CompletableFuture;
Madan Jampaniec5ae342015-04-13 15:43:10 -070045import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080046import java.util.concurrent.ExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070047import java.util.function.Consumer;
48import java.util.function.Function;
49import java.util.stream.Collectors;
Jonathan Hart7d656f42015-01-27 14:07:23 -080050
51import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani24f9efb2014-10-24 18:56:23 -070052
Madan Jampani890bc352014-10-01 22:35:29 -070053@Component(immediate = true)
54@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070055public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070056 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070057
58 private final Logger log = LoggerFactory.getLogger(getClass());
59
Madan Jampania5d0d782014-10-07 14:36:00 -070060 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 private ClusterService clusterService;
62
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070063 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070064 private MessagingService messagingService;
65
Madan Jampanic26eede2015-04-16 11:42:16 -070066 private final boolean useNetty = true;
67
Madan Jampani890bc352014-10-01 22:35:29 -070068 @Activate
69 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070070 ControllerNode localNode = clusterService.getLocalNode();
Madan Jampanic26eede2015-04-16 11:42:16 -070071 if (useNetty) {
Thomas Vachuskab093c912015-04-20 10:28:26 -070072 NettyMessagingManager netty = new NettyMessagingManager(localNode.ip(), localNode.tcpPort());
Madan Jampanic26eede2015-04-16 11:42:16 -070073 try {
74 netty.activate();
75 messagingService = netty;
76 } catch (Exception e) {
77 log.error("NettyMessagingService#activate", e);
78 }
79 } else {
Thomas Vachuskab093c912015-04-20 10:28:26 -070080 IOLoopMessagingManager ioLoop = new IOLoopMessagingManager(localNode.ip(), localNode.tcpPort());
Madan Jampanic26eede2015-04-16 11:42:16 -070081 try {
82 ioLoop.activate();
83 messagingService = ioLoop;
84 } catch (Exception e) {
85 log.error("IOLoopMessagingService#activate", e);
86 }
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070087 }
Yuta HIGUCHI1f8cd5f2014-11-04 23:48:55 -080088 log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -070089 }
90
91 @Deactivate
92 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070093 // TODO: cleanup messageingService if needed.
Yuta HIGUCHI6c6fb9c2014-10-31 20:21:15 -070094 // FIXME: workaround until it becomes a service.
95 try {
Madan Jampanic26eede2015-04-16 11:42:16 -070096 if (useNetty) {
Thomas Vachuskab093c912015-04-20 10:28:26 -070097 ((NettyMessagingManager) messagingService).deactivate();
Madan Jampanic26eede2015-04-16 11:42:16 -070098 } else {
Thomas Vachuskab093c912015-04-20 10:28:26 -070099 ((IOLoopMessagingManager) messagingService).deactivate();
Madan Jampanic26eede2015-04-16 11:42:16 -0700100 }
Yuta HIGUCHI6c6fb9c2014-10-31 20:21:15 -0700101 } catch (Exception e) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700102 log.error("MessagingService#deactivate", e);
Yuta HIGUCHI6c6fb9c2014-10-31 20:21:15 -0700103 }
Madan Jampani890bc352014-10-01 22:35:29 -0700104 log.info("Stopped");
105 }
106
107 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -0800108 public boolean broadcast(ClusterMessage message) {
Madan Jampani890bc352014-10-01 22:35:29 -0700109 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700110 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -0800111 byte[] payload = message.getBytes();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700112 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -0700113 if (!node.equals(localNode)) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800114 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700115 }
116 }
117 return ok;
118 }
119
120 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -0800121 public boolean broadcastIncludeSelf(ClusterMessage message) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800122 boolean ok = true;
Madan Jampaniba472232015-03-04 13:00:50 -0800123 byte[] payload = message.getBytes();
Madan Jampanif5d263b2014-11-13 10:04:40 -0800124 for (ControllerNode node : clusterService.getNodes()) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800125 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampanif5d263b2014-11-13 10:04:40 -0800126 }
127 return ok;
128 }
129
130 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800131 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
Madan Jampani890bc352014-10-01 22:35:29 -0700132 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700133 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -0800134 byte[] payload = message.getBytes();
Madan Jampani890bc352014-10-01 22:35:29 -0700135 for (NodeId nodeId : nodes) {
136 if (!nodeId.equals(localNode.id())) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800137 ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700138 }
139 }
140 return ok;
141 }
142
143 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800144 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
145 return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800146 }
147
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700148 @Override
149 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
150 SettableFuture<byte[]> response = SettableFuture.create();
151 sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
152 if (e == null) {
153 response.set(r);
154 } else {
155 response.setException(e);
156 }
157 });
158 return response;
159 }
160
161 @Override
162 public <M> void broadcast(M message,
163 MessageSubject subject,
164 Function<M, byte[]> encoder) {
165 multicast(message,
166 subject,
167 encoder,
168 clusterService.getNodes()
169 .stream()
170 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
171 .map(ControllerNode::id)
172 .collect(Collectors.toSet()));
173 }
174
175 @Override
176 public <M> void broadcastIncludeSelf(M message,
177 MessageSubject subject,
178 Function<M, byte[]> encoder) {
179 multicast(message,
180 subject,
181 encoder,
182 clusterService.getNodes()
183 .stream()
184 .map(ControllerNode::id)
185 .collect(Collectors.toSet()));
186 }
187
188 @Override
189 public <M> boolean unicast(M message,
190 MessageSubject subject,
191 Function<M, byte[]> encoder,
192 NodeId toNodeId) {
193 byte[] payload = new ClusterMessage(
194 clusterService.getLocalNode().id(),
195 subject,
196 encoder.apply(message)).getBytes();
197 return unicastUnchecked(subject, payload, toNodeId);
198 }
199
200 @Override
201 public <M> void multicast(M message,
202 MessageSubject subject,
203 Function<M, byte[]> encoder,
204 Set<NodeId> nodes) {
205 byte[] payload = new ClusterMessage(
206 clusterService.getLocalNode().id(),
207 subject,
208 encoder.apply(message)).getBytes();
209 nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
210 }
211
212 @Override
213 public <M, R> CompletableFuture<R> sendAndReceive(M message,
214 MessageSubject subject,
215 Function<M, byte[]> encoder,
216 Function<byte[], R> decoder,
217 NodeId toNodeId) {
218 ClusterMessage envelope = new ClusterMessage(
219 clusterService.getLocalNode().id(),
220 subject,
221 encoder.apply(message));
222 return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
223 }
224
225 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700226 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700227 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800228 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -0700229 try {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800230 messagingService.sendAsync(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700231 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700232 } catch (IOException e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800233 log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
Madan Jampani98c17602014-10-23 15:33:23 -0700234 return false;
235 }
236 }
237
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700238 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700239 ControllerNode node = clusterService.getNode(toNodeId);
240 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800241 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700242 return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700243 }
244
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800245 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800246 public void addSubscriber(MessageSubject subject,
247 ClusterMessageHandler subscriber,
248 ExecutorService executor) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700249 messagingService.registerHandler(subject.value(),
250 new InternalClusterMessageHandler(subscriber),
251 executor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800252 }
253
254 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800255 public void removeSubscriber(MessageSubject subject) {
256 messagingService.unregisterHandler(subject.value());
257 }
258
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700259
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700260 @Override
261 public <M, R> void addSubscriber(MessageSubject subject,
262 Function<byte[], M> decoder,
263 Function<M, R> handler,
264 Function<R, byte[]> encoder,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700265 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700266 messagingService.registerHandler(subject.value(),
267 new InternalMessageResponder<>(decoder, encoder, handler),
268 executor);
269 }
270
271 @Override
272 public <M> void addSubscriber(MessageSubject subject,
273 Function<byte[], M> decoder,
274 Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700275 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700276 messagingService.registerHandler(subject.value(),
277 new InternalMessageConsumer<>(decoder, handler),
278 executor);
279 }
280
Madan Jampanic26eede2015-04-16 11:42:16 -0700281 private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
282 private ClusterMessageHandler handler;
283
284 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
285 this.handler = handler;
286 }
287
288 @Override
289 public byte[] apply(byte[] bytes) {
290 ClusterMessage message = ClusterMessage.fromBytes(bytes);
291 handler.handle(message);
292 return message.response();
293 }
294 }
295
296 private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700297 private final Function<byte[], M> decoder;
298 private final Function<R, byte[]> encoder;
299 private final Function<M, R> handler;
300
301 public InternalMessageResponder(Function<byte[], M> decoder,
302 Function<R, byte[]> encoder,
303 Function<M, R> handler) {
304 this.decoder = decoder;
305 this.encoder = encoder;
306 this.handler = handler;
307 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700308
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700309 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700310 public byte[] apply(byte[] bytes) {
311 R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
312 return encoder.apply(reply);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700313 }
314 }
315
Madan Jampanic26eede2015-04-16 11:42:16 -0700316 private class InternalMessageConsumer<M> implements Consumer<byte[]> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700317 private final Function<byte[], M> decoder;
318 private final Consumer<M> consumer;
319
320 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
321 this.decoder = decoder;
322 this.consumer = consumer;
323 }
Madan Jampani8a895092014-10-17 16:55:50 -0700324
325 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700326 public void accept(byte[] bytes) {
327 consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
Madan Jampani8a895092014-10-17 16:55:50 -0700328 }
329 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800330}