blob: e479d55e1cf13b9c550dc5cd3cdf773181bd262e [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Madan Jampani890bc352014-10-01 22:35:29 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.apache.felix.scr.annotations.Service;
Madan Jampani4e729af2014-11-19 11:11:37 -080024import org.onlab.netty.Endpoint;
25import org.onlab.netty.Message;
26import org.onlab.netty.MessageHandler;
27import org.onlab.netty.MessagingService;
28import org.onlab.netty.NettyMessagingService;
Brian O'Connorabafb502014-12-02 22:26:20 -080029import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.ControllerNode;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani890bc352014-10-01 22:35:29 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Madan Jampani2bfa94c2015-04-11 05:03:49 -070039import com.google.common.base.Objects;
40import com.google.common.util.concurrent.ListenableFuture;
41import com.google.common.util.concurrent.SettableFuture;
42
Jonathan Hart7d656f42015-01-27 14:07:23 -080043import java.io.IOException;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070044import java.util.Set;
45import java.util.concurrent.CompletableFuture;
Madan Jampaniec5ae342015-04-13 15:43:10 -070046import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080047import java.util.concurrent.ExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070048import java.util.function.Consumer;
49import java.util.function.Function;
50import java.util.stream.Collectors;
Jonathan Hart7d656f42015-01-27 14:07:23 -080051
52import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani24f9efb2014-10-24 18:56:23 -070053
Madan Jampani890bc352014-10-01 22:35:29 -070054@Component(immediate = true)
55@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070056public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070057 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070058
59 private final Logger log = LoggerFactory.getLogger(getClass());
60
Madan Jampania5d0d782014-10-07 14:36:00 -070061 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 private ClusterService clusterService;
63
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070064 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070065 private MessagingService messagingService;
66
67 @Activate
68 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070069 ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080070 NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070071 // FIXME: workaround until it becomes a service.
72 try {
73 netty.activate();
74 } catch (Exception e) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070075 log.error("NettyMessagingService#activate", e);
76 }
77 messagingService = netty;
Yuta HIGUCHI1f8cd5f2014-11-04 23:48:55 -080078 log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -070079 }
80
81 @Deactivate
82 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070083 // TODO: cleanup messageingService if needed.
Yuta HIGUCHI6c6fb9c2014-10-31 20:21:15 -070084 // FIXME: workaround until it becomes a service.
85 try {
86 ((NettyMessagingService) messagingService).deactivate();
87 } catch (Exception e) {
88 log.error("NettyMessagingService#deactivate", e);
89 }
Madan Jampani890bc352014-10-01 22:35:29 -070090 log.info("Stopped");
91 }
92
93 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -080094 public boolean broadcast(ClusterMessage message) {
Madan Jampani890bc352014-10-01 22:35:29 -070095 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070096 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -080097 byte[] payload = message.getBytes();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070098 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070099 if (!node.equals(localNode)) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800100 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700101 }
102 }
103 return ok;
104 }
105
106 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -0800107 public boolean broadcastIncludeSelf(ClusterMessage message) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800108 boolean ok = true;
Madan Jampaniba472232015-03-04 13:00:50 -0800109 byte[] payload = message.getBytes();
Madan Jampanif5d263b2014-11-13 10:04:40 -0800110 for (ControllerNode node : clusterService.getNodes()) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800111 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampanif5d263b2014-11-13 10:04:40 -0800112 }
113 return ok;
114 }
115
116 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800117 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
Madan Jampani890bc352014-10-01 22:35:29 -0700118 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700119 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -0800120 byte[] payload = message.getBytes();
Madan Jampani890bc352014-10-01 22:35:29 -0700121 for (NodeId nodeId : nodes) {
122 if (!nodeId.equals(localNode.id())) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800123 ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700124 }
125 }
126 return ok;
127 }
128
129 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800130 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
131 return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800132 }
133
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700134 @Override
135 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
136 SettableFuture<byte[]> response = SettableFuture.create();
137 sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
138 if (e == null) {
139 response.set(r);
140 } else {
141 response.setException(e);
142 }
143 });
144 return response;
145 }
146
147 @Override
148 public <M> void broadcast(M message,
149 MessageSubject subject,
150 Function<M, byte[]> encoder) {
151 multicast(message,
152 subject,
153 encoder,
154 clusterService.getNodes()
155 .stream()
156 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
157 .map(ControllerNode::id)
158 .collect(Collectors.toSet()));
159 }
160
161 @Override
162 public <M> void broadcastIncludeSelf(M message,
163 MessageSubject subject,
164 Function<M, byte[]> encoder) {
165 multicast(message,
166 subject,
167 encoder,
168 clusterService.getNodes()
169 .stream()
170 .map(ControllerNode::id)
171 .collect(Collectors.toSet()));
172 }
173
174 @Override
175 public <M> boolean unicast(M message,
176 MessageSubject subject,
177 Function<M, byte[]> encoder,
178 NodeId toNodeId) {
179 byte[] payload = new ClusterMessage(
180 clusterService.getLocalNode().id(),
181 subject,
182 encoder.apply(message)).getBytes();
183 return unicastUnchecked(subject, payload, toNodeId);
184 }
185
186 @Override
187 public <M> void multicast(M message,
188 MessageSubject subject,
189 Function<M, byte[]> encoder,
190 Set<NodeId> nodes) {
191 byte[] payload = new ClusterMessage(
192 clusterService.getLocalNode().id(),
193 subject,
194 encoder.apply(message)).getBytes();
195 nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
196 }
197
198 @Override
199 public <M, R> CompletableFuture<R> sendAndReceive(M message,
200 MessageSubject subject,
201 Function<M, byte[]> encoder,
202 Function<byte[], R> decoder,
203 NodeId toNodeId) {
204 ClusterMessage envelope = new ClusterMessage(
205 clusterService.getLocalNode().id(),
206 subject,
207 encoder.apply(message));
208 return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
209 }
210
211 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700212 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700213 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800214 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -0700215 try {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800216 messagingService.sendAsync(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700217 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700218 } catch (IOException e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800219 log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
Madan Jampani98c17602014-10-23 15:33:23 -0700220 return false;
221 }
222 }
223
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700224 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700225 ControllerNode node = clusterService.getNode(toNodeId);
226 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800227 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700228 return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700229 }
230
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800231 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800232 public void addSubscriber(MessageSubject subject,
233 ClusterMessageHandler subscriber,
234 ExecutorService executor) {
235 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber), executor);
236 }
237
238 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800239 public void removeSubscriber(MessageSubject subject) {
240 messagingService.unregisterHandler(subject.value());
241 }
242
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700243 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700244
245 private final ClusterMessageHandler handler;
246
247 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
248 this.handler = handler;
249 }
250
251 @Override
252 public void handle(Message message) {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800253 final ClusterMessage clusterMessage;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700254 try {
Madan Jampaniba472232015-03-04 13:00:50 -0800255 clusterMessage = ClusterMessage.fromBytes(message.payload());
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800256 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800257 log.error("Failed decoding {}", message, e);
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800258 throw e;
259 }
260 try {
Madan Jampani8a895092014-10-17 16:55:50 -0700261 handler.handle(new InternalClusterMessage(clusterMessage, message));
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700262 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800263 log.trace("Failed handling {}", clusterMessage, e);
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700264 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700265 }
Madan Jampani890bc352014-10-01 22:35:29 -0700266 }
267 }
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700268
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700269 @Override
270 public <M, R> void addSubscriber(MessageSubject subject,
271 Function<byte[], M> decoder,
272 Function<M, R> handler,
273 Function<R, byte[]> encoder,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700274 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700275 messagingService.registerHandler(subject.value(),
276 new InternalMessageResponder<>(decoder, encoder, handler),
277 executor);
278 }
279
280 @Override
281 public <M> void addSubscriber(MessageSubject subject,
282 Function<byte[], M> decoder,
283 Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700284 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700285 messagingService.registerHandler(subject.value(),
286 new InternalMessageConsumer<>(decoder, handler),
287 executor);
288 }
289
290 private class InternalMessageResponder<M, R> implements MessageHandler {
291 private final Function<byte[], M> decoder;
292 private final Function<R, byte[]> encoder;
293 private final Function<M, R> handler;
294
295 public InternalMessageResponder(Function<byte[], M> decoder,
296 Function<R, byte[]> encoder,
297 Function<M, R> handler) {
298 this.decoder = decoder;
299 this.encoder = encoder;
300 this.handler = handler;
301 }
302 @Override
303 public void handle(Message message) throws IOException {
304 R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
305 message.respond(encoder.apply(response));
306 }
307 }
308
309 private class InternalMessageConsumer<M> implements MessageHandler {
310 private final Function<byte[], M> decoder;
311 private final Consumer<M> consumer;
312
313 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
314 this.decoder = decoder;
315 this.consumer = consumer;
316 }
317 @Override
318 public void handle(Message message) throws IOException {
319 consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
320 }
321 }
322
Madan Jampani8a895092014-10-17 16:55:50 -0700323 public static final class InternalClusterMessage extends ClusterMessage {
324
325 private final Message rawMessage;
326
327 public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
328 super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
329 this.rawMessage = rawMessage;
330 }
331
332 @Override
333 public void respond(byte[] response) throws IOException {
334 rawMessage.respond(response);
335 }
336 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800337}