blob: 21b0919a592617fd2c0de7b88d9ff3860a2cde7c [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Madan Jampani890bc352014-10-01 22:35:29 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.apache.felix.scr.annotations.Service;
Madan Jampani27b69c62015-05-15 15:49:02 -070024import org.onlab.util.Tools;
Brian O'Connorabafb502014-12-02 22:26:20 -080025import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29import org.onosproject.store.cluster.messaging.ClusterMessage;
30import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanic26eede2015-04-16 11:42:16 -070031import org.onosproject.store.cluster.messaging.Endpoint;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic26eede2015-04-16 11:42:16 -070033import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070034import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
Madan Jampani2bfa94c2015-04-11 05:03:49 -070037import com.google.common.base.Objects;
38import com.google.common.util.concurrent.ListenableFuture;
39import com.google.common.util.concurrent.SettableFuture;
40
Jonathan Hart7d656f42015-01-27 14:07:23 -080041import java.io.IOException;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070042import java.util.Set;
43import java.util.concurrent.CompletableFuture;
Madan Jampaniec5ae342015-04-13 15:43:10 -070044import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080045import java.util.concurrent.ExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070046import java.util.function.Consumer;
47import java.util.function.Function;
48import java.util.stream.Collectors;
Jonathan Hart7d656f42015-01-27 14:07:23 -080049
50import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani24f9efb2014-10-24 18:56:23 -070051
Madan Jampani890bc352014-10-01 22:35:29 -070052@Component(immediate = true)
53@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070054public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070055 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070056
57 private final Logger log = LoggerFactory.getLogger(getClass());
58
Madan Jampania5d0d782014-10-07 14:36:00 -070059 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
60 private ClusterService clusterService;
61
Madan Jampaniafeebbd2015-05-19 15:26:01 -070062 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 protected MessagingService messagingService;
Madan Jampanic26eede2015-04-16 11:42:16 -070064
Madan Jampani890bc352014-10-01 22:35:29 -070065 @Activate
66 public void activate() {
Madan Jampaniafeebbd2015-05-19 15:26:01 -070067 log.info("Started");
Madan Jampani890bc352014-10-01 22:35:29 -070068 }
69
70 @Deactivate
71 public void deactivate() {
Madan Jampani890bc352014-10-01 22:35:29 -070072 log.info("Stopped");
73 }
74
75 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -080076 public boolean broadcast(ClusterMessage message) {
Madan Jampani890bc352014-10-01 22:35:29 -070077 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070078 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -080079 byte[] payload = message.getBytes();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070080 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070081 if (!node.equals(localNode)) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -080082 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -070083 }
84 }
85 return ok;
86 }
87
88 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -080089 public boolean broadcastIncludeSelf(ClusterMessage message) {
Madan Jampanif5d263b2014-11-13 10:04:40 -080090 boolean ok = true;
Madan Jampaniba472232015-03-04 13:00:50 -080091 byte[] payload = message.getBytes();
Madan Jampanif5d263b2014-11-13 10:04:40 -080092 for (ControllerNode node : clusterService.getNodes()) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -080093 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampanif5d263b2014-11-13 10:04:40 -080094 }
95 return ok;
96 }
97
98 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -080099 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
Madan Jampani890bc352014-10-01 22:35:29 -0700100 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700101 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -0800102 byte[] payload = message.getBytes();
Madan Jampani890bc352014-10-01 22:35:29 -0700103 for (NodeId nodeId : nodes) {
104 if (!nodeId.equals(localNode.id())) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800105 ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700106 }
107 }
108 return ok;
109 }
110
111 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800112 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
113 return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800114 }
115
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700116 @Override
117 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
118 SettableFuture<byte[]> response = SettableFuture.create();
119 sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
120 if (e == null) {
121 response.set(r);
122 } else {
123 response.setException(e);
124 }
125 });
126 return response;
127 }
128
129 @Override
130 public <M> void broadcast(M message,
131 MessageSubject subject,
132 Function<M, byte[]> encoder) {
133 multicast(message,
134 subject,
135 encoder,
136 clusterService.getNodes()
137 .stream()
138 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
139 .map(ControllerNode::id)
140 .collect(Collectors.toSet()));
141 }
142
143 @Override
144 public <M> void broadcastIncludeSelf(M message,
145 MessageSubject subject,
146 Function<M, byte[]> encoder) {
147 multicast(message,
148 subject,
149 encoder,
150 clusterService.getNodes()
151 .stream()
152 .map(ControllerNode::id)
153 .collect(Collectors.toSet()));
154 }
155
156 @Override
157 public <M> boolean unicast(M message,
158 MessageSubject subject,
159 Function<M, byte[]> encoder,
160 NodeId toNodeId) {
161 byte[] payload = new ClusterMessage(
162 clusterService.getLocalNode().id(),
163 subject,
164 encoder.apply(message)).getBytes();
165 return unicastUnchecked(subject, payload, toNodeId);
166 }
167
168 @Override
169 public <M> void multicast(M message,
170 MessageSubject subject,
171 Function<M, byte[]> encoder,
172 Set<NodeId> nodes) {
173 byte[] payload = new ClusterMessage(
174 clusterService.getLocalNode().id(),
175 subject,
176 encoder.apply(message)).getBytes();
177 nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
178 }
179
180 @Override
181 public <M, R> CompletableFuture<R> sendAndReceive(M message,
182 MessageSubject subject,
183 Function<M, byte[]> encoder,
184 Function<byte[], R> decoder,
185 NodeId toNodeId) {
Madan Jampani27b69c62015-05-15 15:49:02 -0700186 try {
187 ClusterMessage envelope = new ClusterMessage(
188 clusterService.getLocalNode().id(),
189 subject,
190 encoder.apply(message));
191 return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
192 } catch (Exception e) {
193 return Tools.exceptionalFuture(e);
194 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700195 }
196
197 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700198 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700199 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800200 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -0700201 try {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800202 messagingService.sendAsync(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700203 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700204 } catch (IOException e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800205 log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
Madan Jampani98c17602014-10-23 15:33:23 -0700206 return false;
207 }
208 }
209
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700210 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700211 ControllerNode node = clusterService.getNode(toNodeId);
212 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800213 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700214 return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700215 }
216
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800217 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800218 public void addSubscriber(MessageSubject subject,
219 ClusterMessageHandler subscriber,
220 ExecutorService executor) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700221 messagingService.registerHandler(subject.value(),
222 new InternalClusterMessageHandler(subscriber),
223 executor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800224 }
225
226 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800227 public void removeSubscriber(MessageSubject subject) {
228 messagingService.unregisterHandler(subject.value());
229 }
230
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700231 @Override
232 public <M, R> void addSubscriber(MessageSubject subject,
233 Function<byte[], M> decoder,
234 Function<M, R> handler,
235 Function<R, byte[]> encoder,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700236 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700237 messagingService.registerHandler(subject.value(),
Madan Jampani27b69c62015-05-15 15:49:02 -0700238 new InternalMessageResponder<M, R>(decoder, encoder, m -> {
239 CompletableFuture<R> responseFuture = new CompletableFuture<>();
240 executor.execute(() -> {
241 try {
242 responseFuture.complete(handler.apply(m));
243 } catch (Exception e) {
244 responseFuture.completeExceptionally(e);
245 }
246 });
247 return responseFuture;
248 }));
249 }
250
251 @Override
252 public <M, R> void addSubscriber(MessageSubject subject,
253 Function<byte[], M> decoder,
254 Function<M, CompletableFuture<R>> handler,
255 Function<R, byte[]> encoder) {
256 messagingService.registerHandler(subject.value(),
257 new InternalMessageResponder<>(decoder, encoder, handler));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700258 }
259
260 @Override
261 public <M> void addSubscriber(MessageSubject subject,
262 Function<byte[], M> decoder,
263 Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700264 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700265 messagingService.registerHandler(subject.value(),
266 new InternalMessageConsumer<>(decoder, handler),
267 executor);
268 }
269
Madan Jampanic26eede2015-04-16 11:42:16 -0700270 private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
271 private ClusterMessageHandler handler;
272
273 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
274 this.handler = handler;
275 }
276
277 @Override
278 public byte[] apply(byte[] bytes) {
279 ClusterMessage message = ClusterMessage.fromBytes(bytes);
280 handler.handle(message);
281 return message.response();
282 }
283 }
284
Madan Jampani27b69c62015-05-15 15:49:02 -0700285 private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700286 private final Function<byte[], M> decoder;
287 private final Function<R, byte[]> encoder;
Madan Jampani27b69c62015-05-15 15:49:02 -0700288 private final Function<M, CompletableFuture<R>> handler;
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700289
290 public InternalMessageResponder(Function<byte[], M> decoder,
291 Function<R, byte[]> encoder,
Madan Jampani27b69c62015-05-15 15:49:02 -0700292 Function<M, CompletableFuture<R>> handler) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700293 this.decoder = decoder;
294 this.encoder = encoder;
295 this.handler = handler;
296 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700297
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700298 @Override
Madan Jampani27b69c62015-05-15 15:49:02 -0700299 public CompletableFuture<byte[]> apply(byte[] bytes) {
300 return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700301 }
302 }
303
Madan Jampanic26eede2015-04-16 11:42:16 -0700304 private class InternalMessageConsumer<M> implements Consumer<byte[]> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700305 private final Function<byte[], M> decoder;
306 private final Consumer<M> consumer;
307
308 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
309 this.decoder = decoder;
310 this.consumer = consumer;
311 }
Madan Jampani8a895092014-10-17 16:55:50 -0700312
313 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700314 public void accept(byte[] bytes) {
315 consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
Madan Jampani8a895092014-10-17 16:55:50 -0700316 }
317 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800318}