blob: ffde400e0d514de7e4146d48a4fdb94d60fc87a9 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Madan Jampani890bc352014-10-01 22:35:29 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070023import org.apache.felix.scr.annotations.Service;
Madan Jampani27b69c62015-05-15 15:49:02 -070024import org.onlab.util.Tools;
Brian O'Connorabafb502014-12-02 22:26:20 -080025import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.ControllerNode;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29import org.onosproject.store.cluster.messaging.ClusterMessage;
30import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanic26eede2015-04-16 11:42:16 -070031import org.onosproject.store.cluster.messaging.Endpoint;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic26eede2015-04-16 11:42:16 -070033import org.onosproject.store.cluster.messaging.MessagingService;
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080034import org.onosproject.utils.MeteringAgent;
Madan Jampani890bc352014-10-01 22:35:29 -070035import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Madan Jampani2bfa94c2015-04-11 05:03:49 -070038import com.google.common.base.Objects;
Madan Jampanid36def02016-01-13 11:21:56 -080039
Madan Jampani2bfa94c2015-04-11 05:03:49 -070040import java.util.Set;
41import java.util.concurrent.CompletableFuture;
Madan Jampaniec5ae342015-04-13 15:43:10 -070042import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080043import java.util.concurrent.ExecutorService;
Madan Jampanid36def02016-01-13 11:21:56 -080044import java.util.function.BiConsumer;
45import java.util.function.BiFunction;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070046import java.util.function.Consumer;
47import java.util.function.Function;
48import java.util.stream.Collectors;
Jonathan Hart7d656f42015-01-27 14:07:23 -080049
50import static com.google.common.base.Preconditions.checkArgument;
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080051import static com.google.common.base.Preconditions.checkNotNull;
Madan Jampani24f9efb2014-10-24 18:56:23 -070052
Madan Jampani890bc352014-10-01 22:35:29 -070053@Component(immediate = true)
54@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070055public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070056 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070057
58 private final Logger log = LoggerFactory.getLogger(getClass());
59
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080060 private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
61 private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
62
63 private static final String PRIMITIVE_NAME = "clusterCommunication";
64 private static final String SUBJECT_PREFIX = "subject";
65 private static final String ENDPOINT_PREFIX = "endpoint";
66
67 private static final String SERIALIZING = "serialization";
68 private static final String DESERIALIZING = "deserialization";
69 private static final String NODE_PREFIX = "node:";
70 private static final String ROUND_TRIP_SUFFIX = ".rtt";
71
Madan Jampania5d0d782014-10-07 14:36:00 -070072 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 private ClusterService clusterService;
74
Madan Jampaniafeebbd2015-05-19 15:26:01 -070075 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected MessagingService messagingService;
Madan Jampanic26eede2015-04-16 11:42:16 -070077
Madan Jampani175e8fd2015-05-20 14:10:45 -070078 private NodeId localNodeId;
79
Madan Jampani890bc352014-10-01 22:35:29 -070080 @Activate
81 public void activate() {
Madan Jampani175e8fd2015-05-20 14:10:45 -070082 localNodeId = clusterService.getLocalNode().id();
Madan Jampaniafeebbd2015-05-19 15:26:01 -070083 log.info("Started");
Madan Jampani890bc352014-10-01 22:35:29 -070084 }
85
86 @Deactivate
87 public void deactivate() {
Madan Jampani890bc352014-10-01 22:35:29 -070088 log.info("Stopped");
89 }
90
91 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -070092 public <M> void broadcast(M message,
93 MessageSubject subject,
94 Function<M, byte[]> encoder) {
95 multicast(message,
96 subject,
97 encoder,
98 clusterService.getNodes()
99 .stream()
100 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
101 .map(ControllerNode::id)
102 .collect(Collectors.toSet()));
103 }
104
105 @Override
106 public <M> void broadcastIncludeSelf(M message,
107 MessageSubject subject,
108 Function<M, byte[]> encoder) {
109 multicast(message,
110 subject,
111 encoder,
112 clusterService.getNodes()
113 .stream()
114 .map(ControllerNode::id)
115 .collect(Collectors.toSet()));
116 }
117
118 @Override
Madan Jampani175e8fd2015-05-20 14:10:45 -0700119 public <M> CompletableFuture<Void> unicast(M message,
120 MessageSubject subject,
121 Function<M, byte[]> encoder,
122 NodeId toNodeId) {
123 try {
124 byte[] payload = new ClusterMessage(
125 localNodeId,
126 subject,
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800127 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
128 ).getBytes();
Madan Jampani175e8fd2015-05-20 14:10:45 -0700129 return doUnicast(subject, payload, toNodeId);
130 } catch (Exception e) {
131 return Tools.exceptionalFuture(e);
132 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700133 }
134
135 @Override
136 public <M> void multicast(M message,
137 MessageSubject subject,
138 Function<M, byte[]> encoder,
139 Set<NodeId> nodes) {
140 byte[] payload = new ClusterMessage(
Madan Jampani175e8fd2015-05-20 14:10:45 -0700141 localNodeId,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700142 subject,
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800143 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
144 .getBytes();
Madan Jampani175e8fd2015-05-20 14:10:45 -0700145 nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700146 }
147
148 @Override
149 public <M, R> CompletableFuture<R> sendAndReceive(M message,
150 MessageSubject subject,
151 Function<M, byte[]> encoder,
152 Function<byte[], R> decoder,
153 NodeId toNodeId) {
Madan Jampani27b69c62015-05-15 15:49:02 -0700154 try {
155 ClusterMessage envelope = new ClusterMessage(
156 clusterService.getLocalNode().id(),
157 subject,
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800158 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
159 apply(message));
160 return sendAndReceive(subject, envelope.getBytes(), toNodeId).
161 thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
Madan Jampani27b69c62015-05-15 15:49:02 -0700162 } catch (Exception e) {
163 return Tools.exceptionalFuture(e);
164 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700165 }
166
Madan Jampani175e8fd2015-05-20 14:10:45 -0700167 private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700168 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700169 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800170 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Madan Jampani175e8fd2015-05-20 14:10:45 -0700171 return messagingService.sendAsync(nodeEp, subject.value(), payload);
Madan Jampani98c17602014-10-23 15:33:23 -0700172 }
173
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700174 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700175 ControllerNode node = clusterService.getNode(toNodeId);
176 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800177 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800178 final MeteringAgent.Context epContext = endpointMeteringAgent.
179 startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
180 final MeteringAgent.Context subjectContext = subjectMeteringAgent.
181 startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
182 return messagingService.sendAndReceive(nodeEp, subject.value(), payload).
183 whenComplete((bytes, throwable) -> {
184 subjectContext.stop(throwable);
185 epContext.stop(throwable);
186 });
Madan Jampani890bc352014-10-01 22:35:29 -0700187 }
188
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800189 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800190 public void addSubscriber(MessageSubject subject,
191 ClusterMessageHandler subscriber,
192 ExecutorService executor) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700193 messagingService.registerHandler(subject.value(),
194 new InternalClusterMessageHandler(subscriber),
195 executor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800196 }
197
198 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800199 public void removeSubscriber(MessageSubject subject) {
200 messagingService.unregisterHandler(subject.value());
201 }
202
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700203 @Override
204 public <M, R> void addSubscriber(MessageSubject subject,
205 Function<byte[], M> decoder,
206 Function<M, R> handler,
207 Function<R, byte[]> encoder,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700208 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700209 messagingService.registerHandler(subject.value(),
Madan Jampani27b69c62015-05-15 15:49:02 -0700210 new InternalMessageResponder<M, R>(decoder, encoder, m -> {
211 CompletableFuture<R> responseFuture = new CompletableFuture<>();
212 executor.execute(() -> {
213 try {
214 responseFuture.complete(handler.apply(m));
215 } catch (Exception e) {
216 responseFuture.completeExceptionally(e);
217 }
218 });
219 return responseFuture;
220 }));
221 }
222
223 @Override
224 public <M, R> void addSubscriber(MessageSubject subject,
225 Function<byte[], M> decoder,
226 Function<M, CompletableFuture<R>> handler,
227 Function<R, byte[]> encoder) {
228 messagingService.registerHandler(subject.value(),
229 new InternalMessageResponder<>(decoder, encoder, handler));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700230 }
231
232 @Override
233 public <M> void addSubscriber(MessageSubject subject,
234 Function<byte[], M> decoder,
235 Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700236 Executor executor) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700237 messagingService.registerHandler(subject.value(),
238 new InternalMessageConsumer<>(decoder, handler),
239 executor);
240 }
241
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800242 /**
243 * Performs the timed function, returning the value it would while timing the operation.
244 *
245 * @param timedFunction the function to be timed
246 * @param meter the metering agent to be used to time the function
247 * @param opName the opname to be used when starting the meter
248 * @param <A> The param type of the function
249 * @param <B> The return type of the function
250 * @return the value returned by the timed function
251 */
252 private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
253 MeteringAgent meter, String opName) {
254 checkNotNull(timedFunction);
255 checkNotNull(meter);
256 checkNotNull(opName);
257 return new Function<A, B>() {
258 @Override
259 public B apply(A a) {
260 final MeteringAgent.Context context = meter.startTimer(opName);
261 B result = null;
262 try {
263 result = timedFunction.apply(a);
264 } catch (Exception e) {
265 context.stop(e);
266 throw new RuntimeException(e);
267 } finally {
268 context.stop(null);
269 return result;
270 }
271 }
272 };
273 }
274
275
Madan Jampanid36def02016-01-13 11:21:56 -0800276 private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
Madan Jampanic26eede2015-04-16 11:42:16 -0700277 private ClusterMessageHandler handler;
278
279 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
280 this.handler = handler;
281 }
282
283 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800284 public byte[] apply(Endpoint sender, byte[] bytes) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700285 ClusterMessage message = ClusterMessage.fromBytes(bytes);
286 handler.handle(message);
287 return message.response();
288 }
289 }
290
Madan Jampanid36def02016-01-13 11:21:56 -0800291 private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700292 private final Function<byte[], M> decoder;
293 private final Function<R, byte[]> encoder;
Madan Jampani27b69c62015-05-15 15:49:02 -0700294 private final Function<M, CompletableFuture<R>> handler;
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700295
296 public InternalMessageResponder(Function<byte[], M> decoder,
297 Function<R, byte[]> encoder,
Madan Jampani27b69c62015-05-15 15:49:02 -0700298 Function<M, CompletableFuture<R>> handler) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700299 this.decoder = decoder;
300 this.encoder = encoder;
301 this.handler = handler;
302 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700303
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700304 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800305 public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800306 return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
307 apply(ClusterMessage.fromBytes(bytes).payload())).
308 thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700309 }
310 }
311
Madan Jampanid36def02016-01-13 11:21:56 -0800312 private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700313 private final Function<byte[], M> decoder;
314 private final Consumer<M> consumer;
315
316 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
317 this.decoder = decoder;
318 this.consumer = consumer;
319 }
Madan Jampani8a895092014-10-17 16:55:50 -0700320
321 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800322 public void accept(Endpoint sender, byte[] bytes) {
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800323 consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
324 apply(ClusterMessage.fromBytes(bytes).payload()));
Madan Jampani8a895092014-10-17 16:55:50 -0700325 }
326 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800327}