blob: 204e0fdf5ca3f7ebc0bfca2e7f9342d12ff70729 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Madan Jampani890bc352014-10-01 22:35:29 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.apache.felix.scr.annotations.Service;
Madan Jampani4e729af2014-11-19 11:11:37 -080024import org.onlab.netty.Endpoint;
25import org.onlab.netty.Message;
26import org.onlab.netty.MessageHandler;
27import org.onlab.netty.MessagingService;
28import org.onlab.netty.NettyMessagingService;
Brian O'Connorabafb502014-12-02 22:26:20 -080029import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.ControllerNode;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33import org.onosproject.store.cluster.messaging.ClusterMessage;
34import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampani890bc352014-10-01 22:35:29 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Madan Jampani2bfa94c2015-04-11 05:03:49 -070039import com.google.common.base.Objects;
40import com.google.common.util.concurrent.ListenableFuture;
41import com.google.common.util.concurrent.SettableFuture;
42
Jonathan Hart7d656f42015-01-27 14:07:23 -080043import java.io.IOException;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070044import java.util.Set;
45import java.util.concurrent.CompletableFuture;
Madan Jampani2af244a2015-02-22 13:12:01 -080046import java.util.concurrent.ExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070047import java.util.function.Consumer;
48import java.util.function.Function;
49import java.util.stream.Collectors;
Jonathan Hart7d656f42015-01-27 14:07:23 -080050
51import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani24f9efb2014-10-24 18:56:23 -070052
Madan Jampani890bc352014-10-01 22:35:29 -070053@Component(immediate = true)
54@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070055public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070056 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070057
58 private final Logger log = LoggerFactory.getLogger(getClass());
59
Madan Jampania5d0d782014-10-07 14:36:00 -070060 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 private ClusterService clusterService;
62
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070063 // TODO: This probably should not be a OSGi service.
Madan Jampani890bc352014-10-01 22:35:29 -070064 private MessagingService messagingService;
65
66 @Activate
67 public void activate() {
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070068 ControllerNode localNode = clusterService.getLocalNode();
Madan Jampani2e5f87b2015-02-22 10:37:15 -080069 NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070070 // FIXME: workaround until it becomes a service.
71 try {
72 netty.activate();
73 } catch (Exception e) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070074 log.error("NettyMessagingService#activate", e);
75 }
76 messagingService = netty;
Yuta HIGUCHI1f8cd5f2014-11-04 23:48:55 -080077 log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -070078 }
79
80 @Deactivate
81 public void deactivate() {
Yuta HIGUCHI993d7aa2014-10-06 22:54:38 -070082 // TODO: cleanup messageingService if needed.
Yuta HIGUCHI6c6fb9c2014-10-31 20:21:15 -070083 // FIXME: workaround until it becomes a service.
84 try {
85 ((NettyMessagingService) messagingService).deactivate();
86 } catch (Exception e) {
87 log.error("NettyMessagingService#deactivate", e);
88 }
Madan Jampani890bc352014-10-01 22:35:29 -070089 log.info("Stopped");
90 }
91
92 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -080093 public boolean broadcast(ClusterMessage message) {
Madan Jampani890bc352014-10-01 22:35:29 -070094 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070095 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -080096 byte[] payload = message.getBytes();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070097 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070098 if (!node.equals(localNode)) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -080099 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700100 }
101 }
102 return ok;
103 }
104
105 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -0800106 public boolean broadcastIncludeSelf(ClusterMessage message) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800107 boolean ok = true;
Madan Jampaniba472232015-03-04 13:00:50 -0800108 byte[] payload = message.getBytes();
Madan Jampanif5d263b2014-11-13 10:04:40 -0800109 for (ControllerNode node : clusterService.getNodes()) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800110 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampanif5d263b2014-11-13 10:04:40 -0800111 }
112 return ok;
113 }
114
115 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800116 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
Madan Jampani890bc352014-10-01 22:35:29 -0700117 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700118 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -0800119 byte[] payload = message.getBytes();
Madan Jampani890bc352014-10-01 22:35:29 -0700120 for (NodeId nodeId : nodes) {
121 if (!nodeId.equals(localNode.id())) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800122 ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700123 }
124 }
125 return ok;
126 }
127
128 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800129 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
130 return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800131 }
132
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700133 @Override
134 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
135 SettableFuture<byte[]> response = SettableFuture.create();
136 sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
137 if (e == null) {
138 response.set(r);
139 } else {
140 response.setException(e);
141 }
142 });
143 return response;
144 }
145
146 @Override
147 public <M> void broadcast(M message,
148 MessageSubject subject,
149 Function<M, byte[]> encoder) {
150 multicast(message,
151 subject,
152 encoder,
153 clusterService.getNodes()
154 .stream()
155 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
156 .map(ControllerNode::id)
157 .collect(Collectors.toSet()));
158 }
159
160 @Override
161 public <M> void broadcastIncludeSelf(M message,
162 MessageSubject subject,
163 Function<M, byte[]> encoder) {
164 multicast(message,
165 subject,
166 encoder,
167 clusterService.getNodes()
168 .stream()
169 .map(ControllerNode::id)
170 .collect(Collectors.toSet()));
171 }
172
173 @Override
174 public <M> boolean unicast(M message,
175 MessageSubject subject,
176 Function<M, byte[]> encoder,
177 NodeId toNodeId) {
178 byte[] payload = new ClusterMessage(
179 clusterService.getLocalNode().id(),
180 subject,
181 encoder.apply(message)).getBytes();
182 return unicastUnchecked(subject, payload, toNodeId);
183 }
184
185 @Override
186 public <M> void multicast(M message,
187 MessageSubject subject,
188 Function<M, byte[]> encoder,
189 Set<NodeId> nodes) {
190 byte[] payload = new ClusterMessage(
191 clusterService.getLocalNode().id(),
192 subject,
193 encoder.apply(message)).getBytes();
194 nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
195 }
196
197 @Override
198 public <M, R> CompletableFuture<R> sendAndReceive(M message,
199 MessageSubject subject,
200 Function<M, byte[]> encoder,
201 Function<byte[], R> decoder,
202 NodeId toNodeId) {
203 ClusterMessage envelope = new ClusterMessage(
204 clusterService.getLocalNode().id(),
205 subject,
206 encoder.apply(message));
207 return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
208 }
209
210 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700211 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700212 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800213 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -0700214 try {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800215 messagingService.sendAsync(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700216 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700217 } catch (IOException e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800218 log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
Madan Jampani98c17602014-10-23 15:33:23 -0700219 return false;
220 }
221 }
222
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700223 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700224 ControllerNode node = clusterService.getNode(toNodeId);
225 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800226 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700227 return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700228 }
229
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800230 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800231 public void addSubscriber(MessageSubject subject,
232 ClusterMessageHandler subscriber,
233 ExecutorService executor) {
234 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber), executor);
235 }
236
237 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800238 public void removeSubscriber(MessageSubject subject) {
239 messagingService.unregisterHandler(subject.value());
240 }
241
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700242 private final class InternalClusterMessageHandler implements MessageHandler {
Madan Jampani890bc352014-10-01 22:35:29 -0700243
244 private final ClusterMessageHandler handler;
245
246 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
247 this.handler = handler;
248 }
249
250 @Override
251 public void handle(Message message) {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800252 final ClusterMessage clusterMessage;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700253 try {
Madan Jampaniba472232015-03-04 13:00:50 -0800254 clusterMessage = ClusterMessage.fromBytes(message.payload());
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800255 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800256 log.error("Failed decoding {}", message, e);
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800257 throw e;
258 }
259 try {
Madan Jampani8a895092014-10-17 16:55:50 -0700260 handler.handle(new InternalClusterMessage(clusterMessage, message));
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700261 } catch (Exception e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800262 log.trace("Failed handling {}", clusterMessage, e);
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700263 throw e;
Yuta HIGUCHI38bd1452014-10-07 17:37:25 -0700264 }
Madan Jampani890bc352014-10-01 22:35:29 -0700265 }
266 }
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700267
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700268 @Override
269 public <M, R> void addSubscriber(MessageSubject subject,
270 Function<byte[], M> decoder,
271 Function<M, R> handler,
272 Function<R, byte[]> encoder,
273 ExecutorService executor) {
274 messagingService.registerHandler(subject.value(),
275 new InternalMessageResponder<>(decoder, encoder, handler),
276 executor);
277 }
278
279 @Override
280 public <M> void addSubscriber(MessageSubject subject,
281 Function<byte[], M> decoder,
282 Consumer<M> handler,
283 ExecutorService executor) {
284 messagingService.registerHandler(subject.value(),
285 new InternalMessageConsumer<>(decoder, handler),
286 executor);
287 }
288
289 private class InternalMessageResponder<M, R> implements MessageHandler {
290 private final Function<byte[], M> decoder;
291 private final Function<R, byte[]> encoder;
292 private final Function<M, R> handler;
293
294 public InternalMessageResponder(Function<byte[], M> decoder,
295 Function<R, byte[]> encoder,
296 Function<M, R> handler) {
297 this.decoder = decoder;
298 this.encoder = encoder;
299 this.handler = handler;
300 }
301 @Override
302 public void handle(Message message) throws IOException {
303 R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
304 message.respond(encoder.apply(response));
305 }
306 }
307
308 private class InternalMessageConsumer<M> implements MessageHandler {
309 private final Function<byte[], M> decoder;
310 private final Consumer<M> consumer;
311
312 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
313 this.decoder = decoder;
314 this.consumer = consumer;
315 }
316 @Override
317 public void handle(Message message) throws IOException {
318 consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
319 }
320 }
321
Madan Jampani8a895092014-10-17 16:55:50 -0700322 public static final class InternalClusterMessage extends ClusterMessage {
323
324 private final Message rawMessage;
325
326 public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
327 super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
328 this.rawMessage = rawMessage;
329 }
330
331 @Override
332 public void respond(byte[] response) throws IOException {
333 rawMessage.respond(response);
334 }
335 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800336}