blob: df4ac5c5110a96dfc6051913dc8d9232f558c601 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Madan Jampani890bc352014-10-01 22:35:29 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.apache.felix.scr.annotations.Service;
Madan Jampani27b69c62015-05-15 15:49:02 -070024import org.onlab.util.Tools;
Brian O'Connorabafb502014-12-02 22:26:20 -080025import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29import org.onosproject.store.cluster.messaging.ClusterMessage;
30import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanic26eede2015-04-16 11:42:16 -070031import org.onosproject.store.cluster.messaging.Endpoint;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic26eede2015-04-16 11:42:16 -070033import org.onosproject.store.cluster.messaging.MessagingService;
Madan Jampani890bc352014-10-01 22:35:29 -070034import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
Madan Jampani2bfa94c2015-04-11 05:03:49 -070037import com.google.common.base.Objects;
Madan Jampanid36def02016-01-13 11:21:56 -080038
Madan Jampani2bfa94c2015-04-11 05:03:49 -070039import java.util.Set;
40import java.util.concurrent.CompletableFuture;
Madan Jampaniec5ae342015-04-13 15:43:10 -070041import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080042import java.util.concurrent.ExecutorService;
Madan Jampanid36def02016-01-13 11:21:56 -080043import java.util.function.BiConsumer;
44import java.util.function.BiFunction;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070045import java.util.function.Consumer;
46import java.util.function.Function;
47import java.util.stream.Collectors;
Jonathan Hart7d656f42015-01-27 14:07:23 -080048
49import static com.google.common.base.Preconditions.checkArgument;
Madan Jampani24f9efb2014-10-24 18:56:23 -070050
Madan Jampani890bc352014-10-01 22:35:29 -070051@Component(immediate = true)
52@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070053public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070054 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070055
56 private final Logger log = LoggerFactory.getLogger(getClass());
57
Madan Jampania5d0d782014-10-07 14:36:00 -070058 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 private ClusterService clusterService;
60
Madan Jampaniafeebbd2015-05-19 15:26:01 -070061 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 protected MessagingService messagingService;
Madan Jampanic26eede2015-04-16 11:42:16 -070063
Madan Jampani175e8fd2015-05-20 14:10:45 -070064 private NodeId localNodeId;
65
Madan Jampani890bc352014-10-01 22:35:29 -070066 @Activate
67 public void activate() {
Madan Jampani175e8fd2015-05-20 14:10:45 -070068 localNodeId = clusterService.getLocalNode().id();
Madan Jampaniafeebbd2015-05-19 15:26:01 -070069 log.info("Started");
Madan Jampani890bc352014-10-01 22:35:29 -070070 }
71
72 @Deactivate
73 public void deactivate() {
Madan Jampani890bc352014-10-01 22:35:29 -070074 log.info("Stopped");
75 }
76
77 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -070078 public <M> void broadcast(M message,
79 MessageSubject subject,
80 Function<M, byte[]> encoder) {
81 multicast(message,
82 subject,
83 encoder,
84 clusterService.getNodes()
85 .stream()
86 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
87 .map(ControllerNode::id)
88 .collect(Collectors.toSet()));
89 }
90
91 @Override
92 public <M> void broadcastIncludeSelf(M message,
93 MessageSubject subject,
94 Function<M, byte[]> encoder) {
95 multicast(message,
96 subject,
97 encoder,
98 clusterService.getNodes()
99 .stream()
100 .map(ControllerNode::id)
101 .collect(Collectors.toSet()));
102 }
103
104 @Override
Madan Jampani175e8fd2015-05-20 14:10:45 -0700105 public <M> CompletableFuture<Void> unicast(M message,
106 MessageSubject subject,
107 Function<M, byte[]> encoder,
108 NodeId toNodeId) {
109 try {
110 byte[] payload = new ClusterMessage(
111 localNodeId,
112 subject,
113 encoder.apply(message)).getBytes();
114 return doUnicast(subject, payload, toNodeId);
115 } catch (Exception e) {
116 return Tools.exceptionalFuture(e);
117 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700118 }
119
120 @Override
121 public <M> void multicast(M message,
122 MessageSubject subject,
123 Function<M, byte[]> encoder,
124 Set<NodeId> nodes) {
125 byte[] payload = new ClusterMessage(
Madan Jampani175e8fd2015-05-20 14:10:45 -0700126 localNodeId,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700127 subject,
128 encoder.apply(message)).getBytes();
Madan Jampani175e8fd2015-05-20 14:10:45 -0700129 nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700130 }
131
132 @Override
133 public <M, R> CompletableFuture<R> sendAndReceive(M message,
134 MessageSubject subject,
135 Function<M, byte[]> encoder,
136 Function<byte[], R> decoder,
137 NodeId toNodeId) {
Madan Jampani27b69c62015-05-15 15:49:02 -0700138 try {
139 ClusterMessage envelope = new ClusterMessage(
140 clusterService.getLocalNode().id(),
141 subject,
142 encoder.apply(message));
143 return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
144 } catch (Exception e) {
145 return Tools.exceptionalFuture(e);
146 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700147 }
148
Madan Jampani175e8fd2015-05-20 14:10:45 -0700149 private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700150 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700151 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800152 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani175e8fd2015-05-20 14:10:45 -0700153 return messagingService.sendAsync(nodeEp, subject.value(), payload);
Madan Jampani98c17602014-10-23 15:33:23 -0700154 }
155
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700156 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700157 ControllerNode node = clusterService.getNode(toNodeId);
158 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800159 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700160 return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
Madan Jampani890bc352014-10-01 22:35:29 -0700161 }
162
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800163 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800164 public void addSubscriber(MessageSubject subject,
165 ClusterMessageHandler subscriber,
166 ExecutorService executor) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700167 messagingService.registerHandler(subject.value(),
168 new InternalClusterMessageHandler(subscriber),
169 executor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800170 }
171
172 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800173 public void removeSubscriber(MessageSubject subject) {
174 messagingService.unregisterHandler(subject.value());
175 }
176
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700177 @Override
178 public <M, R> void addSubscriber(MessageSubject subject,
179 Function<byte[], M> decoder,
180 Function<M, R> handler,
181 Function<R, byte[]> encoder,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700182 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700183 messagingService.registerHandler(subject.value(),
Madan Jampani27b69c62015-05-15 15:49:02 -0700184 new InternalMessageResponder<M, R>(decoder, encoder, m -> {
185 CompletableFuture<R> responseFuture = new CompletableFuture<>();
186 executor.execute(() -> {
187 try {
188 responseFuture.complete(handler.apply(m));
189 } catch (Exception e) {
190 responseFuture.completeExceptionally(e);
191 }
192 });
193 return responseFuture;
194 }));
195 }
196
197 @Override
198 public <M, R> void addSubscriber(MessageSubject subject,
199 Function<byte[], M> decoder,
200 Function<M, CompletableFuture<R>> handler,
201 Function<R, byte[]> encoder) {
202 messagingService.registerHandler(subject.value(),
203 new InternalMessageResponder<>(decoder, encoder, handler));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700204 }
205
206 @Override
207 public <M> void addSubscriber(MessageSubject subject,
208 Function<byte[], M> decoder,
209 Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700210 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700211 messagingService.registerHandler(subject.value(),
212 new InternalMessageConsumer<>(decoder, handler),
213 executor);
214 }
215
Madan Jampanid36def02016-01-13 11:21:56 -0800216 private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
Madan Jampanic26eede2015-04-16 11:42:16 -0700217 private ClusterMessageHandler handler;
218
219 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
220 this.handler = handler;
221 }
222
223 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800224 public byte[] apply(Endpoint sender, byte[] bytes) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700225 ClusterMessage message = ClusterMessage.fromBytes(bytes);
226 handler.handle(message);
227 return message.response();
228 }
229 }
230
Madan Jampanid36def02016-01-13 11:21:56 -0800231 private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700232 private final Function<byte[], M> decoder;
233 private final Function<R, byte[]> encoder;
Madan Jampani27b69c62015-05-15 15:49:02 -0700234 private final Function<M, CompletableFuture<R>> handler;
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700235
236 public InternalMessageResponder(Function<byte[], M> decoder,
237 Function<R, byte[]> encoder,
Madan Jampani27b69c62015-05-15 15:49:02 -0700238 Function<M, CompletableFuture<R>> handler) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700239 this.decoder = decoder;
240 this.encoder = encoder;
241 this.handler = handler;
242 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700243
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700244 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800245 public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
Madan Jampani27b69c62015-05-15 15:49:02 -0700246 return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700247 }
248 }
249
Madan Jampanid36def02016-01-13 11:21:56 -0800250 private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700251 private final Function<byte[], M> decoder;
252 private final Consumer<M> consumer;
253
254 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
255 this.decoder = decoder;
256 this.consumer = consumer;
257 }
Madan Jampani8a895092014-10-17 16:55:50 -0700258
259 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800260 public void accept(Endpoint sender, byte[] bytes) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700261 consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
Madan Jampani8a895092014-10-17 16:55:50 -0700262 }
263 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800264}