blob: 7475819fb8b0f6a3a259e9c7b0c0ee02508bdbd4 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Madan Jampani890bc352014-10-01 22:35:29 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.apache.felix.scr.annotations.Service;
Brian O'Connorabafb502014-12-02 22:26:20 -080024import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.ControllerNode;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
28import org.onosproject.store.cluster.messaging.ClusterMessage;
29import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanic26eede2015-04-16 11:42:16 -070030import org.onosproject.store.cluster.messaging.Endpoint;
Brian O'Connorabafb502014-12-02 22:26:20 -080031import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic26eede2015-04-16 11:42:16 -070032import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070033import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
Madan Jampani2bfa94c2015-04-11 05:03:49 -070036import com.google.common.base.Objects;
37import com.google.common.util.concurrent.ListenableFuture;
38import com.google.common.util.concurrent.SettableFuture;
39
Jonathan Hart7d656f42015-01-27 14:07:23 -080040import java.io.IOException;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070041import java.util.Set;
42import java.util.concurrent.CompletableFuture;
Madan Jampaniec5ae342015-04-13 15:43:10 -070043import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080044import java.util.concurrent.ExecutorService;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070045import java.util.function.Consumer;
46import java.util.function.Function;
47import java.util.stream.Collectors;
Jonathan Hart7d656f42015-01-27 14:07:23 -080048
49import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani24f9efb2014-10-24 18:56:23 -070050
Madan Jampani890bc352014-10-01 22:35:29 -070051@Component(immediate = true)
52@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070053public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070054 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070055
56 private final Logger log = LoggerFactory.getLogger(getClass());
57
Madan Jampania5d0d782014-10-07 14:36:00 -070058 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 private ClusterService clusterService;
60
Madan Jampaniafeebbd2015-05-19 15:26:01 -070061 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 protected MessagingService messagingService;
Madan Jampanic26eede2015-04-16 11:42:16 -070063
Madan Jampani890bc352014-10-01 22:35:29 -070064 @Activate
65 public void activate() {
Madan Jampaniafeebbd2015-05-19 15:26:01 -070066 log.info("Started");
Madan Jampani890bc352014-10-01 22:35:29 -070067 }
68
69 @Deactivate
70 public void deactivate() {
Madan Jampani890bc352014-10-01 22:35:29 -070071 log.info("Stopped");
72 }
73
74 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -080075 public boolean broadcast(ClusterMessage message) {
Madan Jampani890bc352014-10-01 22:35:29 -070076 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070077 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -080078 byte[] payload = message.getBytes();
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -070079 for (ControllerNode node : clusterService.getNodes()) {
Madan Jampani890bc352014-10-01 22:35:29 -070080 if (!node.equals(localNode)) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -080081 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -070082 }
83 }
84 return ok;
85 }
86
87 @Override
Jonathan Hart7d656f42015-01-27 14:07:23 -080088 public boolean broadcastIncludeSelf(ClusterMessage message) {
Madan Jampanif5d263b2014-11-13 10:04:40 -080089 boolean ok = true;
Madan Jampaniba472232015-03-04 13:00:50 -080090 byte[] payload = message.getBytes();
Madan Jampanif5d263b2014-11-13 10:04:40 -080091 for (ControllerNode node : clusterService.getNodes()) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -080092 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
Madan Jampanif5d263b2014-11-13 10:04:40 -080093 }
94 return ok;
95 }
96
97 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -080098 public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
Madan Jampani890bc352014-10-01 22:35:29 -070099 boolean ok = true;
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -0700100 final ControllerNode localNode = clusterService.getLocalNode();
Madan Jampaniba472232015-03-04 13:00:50 -0800101 byte[] payload = message.getBytes();
Madan Jampani890bc352014-10-01 22:35:29 -0700102 for (NodeId nodeId : nodes) {
103 if (!nodeId.equals(localNode.id())) {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800104 ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
Madan Jampani890bc352014-10-01 22:35:29 -0700105 }
106 }
107 return ok;
108 }
109
110 @Override
Brian O'Connor5eb77c82015-03-02 18:09:39 -0800111 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
112 return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800113 }
114
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700115 @Override
116 public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
117 SettableFuture<byte[]> response = SettableFuture.create();
118 sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
119 if (e == null) {
120 response.set(r);
121 } else {
122 response.setException(e);
123 }
124 });
125 return response;
126 }
127
128 @Override
129 public <M> void broadcast(M message,
130 MessageSubject subject,
131 Function<M, byte[]> encoder) {
132 multicast(message,
133 subject,
134 encoder,
135 clusterService.getNodes()
136 .stream()
137 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
138 .map(ControllerNode::id)
139 .collect(Collectors.toSet()));
140 }
141
142 @Override
143 public <M> void broadcastIncludeSelf(M message,
144 MessageSubject subject,
145 Function<M, byte[]> encoder) {
146 multicast(message,
147 subject,
148 encoder,
149 clusterService.getNodes()
150 .stream()
151 .map(ControllerNode::id)
152 .collect(Collectors.toSet()));
153 }
154
155 @Override
156 public <M> boolean unicast(M message,
157 MessageSubject subject,
158 Function<M, byte[]> encoder,
159 NodeId toNodeId) {
160 byte[] payload = new ClusterMessage(
161 clusterService.getLocalNode().id(),
162 subject,
163 encoder.apply(message)).getBytes();
164 return unicastUnchecked(subject, payload, toNodeId);
165 }
166
167 @Override
168 public <M> void multicast(M message,
169 MessageSubject subject,
170 Function<M, byte[]> encoder,
171 Set<NodeId> nodes) {
172 byte[] payload = new ClusterMessage(
173 clusterService.getLocalNode().id(),
174 subject,
175 encoder.apply(message)).getBytes();
176 nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
177 }
178
179 @Override
180 public <M, R> CompletableFuture<R> sendAndReceive(M message,
181 MessageSubject subject,
182 Function<M, byte[]> encoder,
183 Function<byte[], R> decoder,
184 NodeId toNodeId) {
185 ClusterMessage envelope = new ClusterMessage(
186 clusterService.getLocalNode().id(),
187 subject,
188 encoder.apply(message));
189 return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
190 }
191
192 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700193 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700194 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800195 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani890bc352014-10-01 22:35:29 -0700196 try {
Brian O'Connor9b7a32d2015-02-19 16:03:45 -0800197 messagingService.sendAsync(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700198 return true;
Yuta HIGUCHI053b7d72014-10-07 23:03:20 -0700199 } catch (IOException e) {
Madan Jampania14047d2015-02-25 12:23:02 -0800200 log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
Madan Jampani98c17602014-10-23 15:33:23 -0700201 return false;
202 }
203 }
204
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700205 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700206 ControllerNode node = clusterService.getNode(toNodeId);
207 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800208 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700209 return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700210 }
211
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800212 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800213 public void addSubscriber(MessageSubject subject,
214 ClusterMessageHandler subscriber,
215 ExecutorService executor) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700216 messagingService.registerHandler(subject.value(),
217 new InternalClusterMessageHandler(subscriber),
218 executor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800219 }
220
221 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800222 public void removeSubscriber(MessageSubject subject) {
223 messagingService.unregisterHandler(subject.value());
224 }
225
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700226
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700227 @Override
228 public <M, R> void addSubscriber(MessageSubject subject,
229 Function<byte[], M> decoder,
230 Function<M, R> handler,
231 Function<R, byte[]> encoder,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700232 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700233 messagingService.registerHandler(subject.value(),
234 new InternalMessageResponder<>(decoder, encoder, handler),
235 executor);
236 }
237
238 @Override
239 public <M> void addSubscriber(MessageSubject subject,
240 Function<byte[], M> decoder,
241 Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700242 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700243 messagingService.registerHandler(subject.value(),
244 new InternalMessageConsumer<>(decoder, handler),
245 executor);
246 }
247
Madan Jampanic26eede2015-04-16 11:42:16 -0700248 private class InternalClusterMessageHandler implements Function<byte[], byte[]> {
249 private ClusterMessageHandler handler;
250
251 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
252 this.handler = handler;
253 }
254
255 @Override
256 public byte[] apply(byte[] bytes) {
257 ClusterMessage message = ClusterMessage.fromBytes(bytes);
258 handler.handle(message);
259 return message.response();
260 }
261 }
262
263 private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700264 private final Function<byte[], M> decoder;
265 private final Function<R, byte[]> encoder;
266 private final Function<M, R> handler;
267
268 public InternalMessageResponder(Function<byte[], M> decoder,
269 Function<R, byte[]> encoder,
270 Function<M, R> handler) {
271 this.decoder = decoder;
272 this.encoder = encoder;
273 this.handler = handler;
274 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700275
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700276 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700277 public byte[] apply(byte[] bytes) {
278 R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
279 return encoder.apply(reply);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700280 }
281 }
282
Madan Jampanic26eede2015-04-16 11:42:16 -0700283 private class InternalMessageConsumer<M> implements Consumer<byte[]> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700284 private final Function<byte[], M> decoder;
285 private final Consumer<M> consumer;
286
287 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
288 this.decoder = decoder;
289 this.consumer = consumer;
290 }
Madan Jampani8a895092014-10-17 16:55:50 -0700291
292 @Override
Madan Jampanic26eede2015-04-16 11:42:16 -0700293 public void accept(byte[] bytes) {
294 consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
Madan Jampani8a895092014-10-17 16:55:50 -0700295 }
296 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800297}