blob: 5410fa89f38ed34d75842c67b421aa7f31c4490d [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Aaron Kruglikove2411892016-02-10 18:18:17 -080018import com.google.common.base.Throwables;
Madan Jampani890bc352014-10-01 22:35:29 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070022import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070024import org.apache.felix.scr.annotations.Service;
Madan Jampani27b69c62015-05-15 15:49:02 -070025import org.onlab.util.Tools;
Brian O'Connorabafb502014-12-02 22:26:20 -080026import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.ControllerNode;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
30import org.onosproject.store.cluster.messaging.ClusterMessage;
31import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanic26eede2015-04-16 11:42:16 -070032import org.onosproject.store.cluster.messaging.Endpoint;
Brian O'Connorabafb502014-12-02 22:26:20 -080033import org.onosproject.store.cluster.messaging.MessageSubject;
Madan Jampanic26eede2015-04-16 11:42:16 -070034import org.onosproject.store.cluster.messaging.MessagingService;
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080035import org.onosproject.utils.MeteringAgent;
Madan Jampani890bc352014-10-01 22:35:29 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Madan Jampani2bfa94c2015-04-11 05:03:49 -070039import com.google.common.base.Objects;
Madan Jampanid36def02016-01-13 11:21:56 -080040
Madan Jampani2bfa94c2015-04-11 05:03:49 -070041import java.util.Set;
42import java.util.concurrent.CompletableFuture;
Madan Jampaniec5ae342015-04-13 15:43:10 -070043import java.util.concurrent.Executor;
Madan Jampani2af244a2015-02-22 13:12:01 -080044import java.util.concurrent.ExecutorService;
Madan Jampanid36def02016-01-13 11:21:56 -080045import java.util.function.BiConsumer;
46import java.util.function.BiFunction;
Madan Jampani2bfa94c2015-04-11 05:03:49 -070047import java.util.function.Consumer;
48import java.util.function.Function;
49import java.util.stream.Collectors;
Jonathan Hart7d656f42015-01-27 14:07:23 -080050
51import static com.google.common.base.Preconditions.checkArgument;
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080052import static com.google.common.base.Preconditions.checkNotNull;
Heedo Kang4a47a302016-02-29 17:40:23 +090053import static org.onosproject.security.AppGuard.checkPermission;
54import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
Madan Jampani24f9efb2014-10-24 18:56:23 -070055
Madan Jampani890bc352014-10-01 22:35:29 -070056@Component(immediate = true)
57@Service
Madan Jampani3b0dfd52014-10-02 16:48:13 -070058public class ClusterCommunicationManager
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070059 implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070060
61 private final Logger log = LoggerFactory.getLogger(getClass());
62
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080063 private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
64 private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
65
66 private static final String PRIMITIVE_NAME = "clusterCommunication";
67 private static final String SUBJECT_PREFIX = "subject";
68 private static final String ENDPOINT_PREFIX = "endpoint";
69
70 private static final String SERIALIZING = "serialization";
71 private static final String DESERIALIZING = "deserialization";
72 private static final String NODE_PREFIX = "node:";
73 private static final String ROUND_TRIP_SUFFIX = ".rtt";
Aaron Kruglikove2411892016-02-10 18:18:17 -080074 private static final String ONE_WAY_SUFFIX = ".oneway";
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080075
Madan Jampania5d0d782014-10-07 14:36:00 -070076 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 private ClusterService clusterService;
78
Madan Jampaniafeebbd2015-05-19 15:26:01 -070079 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected MessagingService messagingService;
Madan Jampanic26eede2015-04-16 11:42:16 -070081
Madan Jampani175e8fd2015-05-20 14:10:45 -070082 private NodeId localNodeId;
83
Madan Jampani890bc352014-10-01 22:35:29 -070084 @Activate
85 public void activate() {
Madan Jampani175e8fd2015-05-20 14:10:45 -070086 localNodeId = clusterService.getLocalNode().id();
Madan Jampaniafeebbd2015-05-19 15:26:01 -070087 log.info("Started");
Madan Jampani890bc352014-10-01 22:35:29 -070088 }
89
90 @Deactivate
91 public void deactivate() {
Madan Jampani890bc352014-10-01 22:35:29 -070092 log.info("Stopped");
93 }
94
95 @Override
Madan Jampani2bfa94c2015-04-11 05:03:49 -070096 public <M> void broadcast(M message,
97 MessageSubject subject,
98 Function<M, byte[]> encoder) {
Heedo Kang4a47a302016-02-29 17:40:23 +090099 checkPermission(CLUSTER_WRITE);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700100 multicast(message,
101 subject,
102 encoder,
103 clusterService.getNodes()
104 .stream()
105 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
106 .map(ControllerNode::id)
107 .collect(Collectors.toSet()));
108 }
109
110 @Override
111 public <M> void broadcastIncludeSelf(M message,
112 MessageSubject subject,
113 Function<M, byte[]> encoder) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900114 checkPermission(CLUSTER_WRITE);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700115 multicast(message,
116 subject,
117 encoder,
118 clusterService.getNodes()
119 .stream()
120 .map(ControllerNode::id)
121 .collect(Collectors.toSet()));
122 }
123
124 @Override
Madan Jampani175e8fd2015-05-20 14:10:45 -0700125 public <M> CompletableFuture<Void> unicast(M message,
126 MessageSubject subject,
127 Function<M, byte[]> encoder,
128 NodeId toNodeId) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900129 checkPermission(CLUSTER_WRITE);
Madan Jampani175e8fd2015-05-20 14:10:45 -0700130 try {
131 byte[] payload = new ClusterMessage(
132 localNodeId,
133 subject,
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800134 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
135 ).getBytes();
Madan Jampani175e8fd2015-05-20 14:10:45 -0700136 return doUnicast(subject, payload, toNodeId);
137 } catch (Exception e) {
138 return Tools.exceptionalFuture(e);
139 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700140 }
141
142 @Override
143 public <M> void multicast(M message,
144 MessageSubject subject,
145 Function<M, byte[]> encoder,
146 Set<NodeId> nodes) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900147 checkPermission(CLUSTER_WRITE);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700148 byte[] payload = new ClusterMessage(
Madan Jampani175e8fd2015-05-20 14:10:45 -0700149 localNodeId,
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700150 subject,
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800151 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
152 .getBytes();
Madan Jampani175e8fd2015-05-20 14:10:45 -0700153 nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700154 }
155
156 @Override
157 public <M, R> CompletableFuture<R> sendAndReceive(M message,
158 MessageSubject subject,
159 Function<M, byte[]> encoder,
160 Function<byte[], R> decoder,
161 NodeId toNodeId) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900162 checkPermission(CLUSTER_WRITE);
Madan Jampani27b69c62015-05-15 15:49:02 -0700163 try {
164 ClusterMessage envelope = new ClusterMessage(
165 clusterService.getLocalNode().id(),
166 subject,
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800167 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
168 apply(message));
169 return sendAndReceive(subject, envelope.getBytes(), toNodeId).
170 thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
Madan Jampani27b69c62015-05-15 15:49:02 -0700171 } catch (Exception e) {
172 return Tools.exceptionalFuture(e);
173 }
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700174 }
175
Madan Jampani175e8fd2015-05-20 14:10:45 -0700176 private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Yuta HIGUCHI7a8d4aa2014-10-07 17:37:11 -0700177 ControllerNode node = clusterService.getNode(toNodeId);
Madan Jampani890bc352014-10-01 22:35:29 -0700178 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800179 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Aaron Kruglikove2411892016-02-10 18:18:17 -0800180 MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
181 return messagingService.sendAsync(nodeEp, subject.value(), payload).whenComplete((r, e) -> context.stop(e));
Madan Jampani98c17602014-10-23 15:33:23 -0700182 }
183
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700184 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
Madan Jampani4a9cb6d2014-10-17 10:48:50 -0700185 ControllerNode node = clusterService.getNode(toNodeId);
186 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Madan Jampani2e5f87b2015-02-22 10:37:15 -0800187 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
Aaron Kruglikove2411892016-02-10 18:18:17 -0800188 MeteringAgent.Context epContext = endpointMeteringAgent.
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800189 startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
Aaron Kruglikove2411892016-02-10 18:18:17 -0800190 MeteringAgent.Context subjectContext = subjectMeteringAgent.
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800191 startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
192 return messagingService.sendAndReceive(nodeEp, subject.value(), payload).
193 whenComplete((bytes, throwable) -> {
194 subjectContext.stop(throwable);
195 epContext.stop(throwable);
196 });
Madan Jampani890bc352014-10-01 22:35:29 -0700197 }
198
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800199 @Override
Madan Jampani2af244a2015-02-22 13:12:01 -0800200 public void addSubscriber(MessageSubject subject,
201 ClusterMessageHandler subscriber,
202 ExecutorService executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900203 checkPermission(CLUSTER_WRITE);
Madan Jampanic26eede2015-04-16 11:42:16 -0700204 messagingService.registerHandler(subject.value(),
205 new InternalClusterMessageHandler(subscriber),
206 executor);
Madan Jampani2af244a2015-02-22 13:12:01 -0800207 }
208
209 @Override
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800210 public void removeSubscriber(MessageSubject subject) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900211 checkPermission(CLUSTER_WRITE);
Yuta HIGUCHI76b54bf2014-11-07 01:56:55 -0800212 messagingService.unregisterHandler(subject.value());
213 }
214
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700215 @Override
216 public <M, R> void addSubscriber(MessageSubject subject,
217 Function<byte[], M> decoder,
218 Function<M, R> handler,
219 Function<R, byte[]> encoder,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700220 Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900221 checkPermission(CLUSTER_WRITE);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700222 messagingService.registerHandler(subject.value(),
Madan Jampani27b69c62015-05-15 15:49:02 -0700223 new InternalMessageResponder<M, R>(decoder, encoder, m -> {
224 CompletableFuture<R> responseFuture = new CompletableFuture<>();
225 executor.execute(() -> {
226 try {
227 responseFuture.complete(handler.apply(m));
228 } catch (Exception e) {
229 responseFuture.completeExceptionally(e);
230 }
231 });
232 return responseFuture;
233 }));
234 }
235
236 @Override
237 public <M, R> void addSubscriber(MessageSubject subject,
238 Function<byte[], M> decoder,
239 Function<M, CompletableFuture<R>> handler,
240 Function<R, byte[]> encoder) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900241 checkPermission(CLUSTER_WRITE);
Madan Jampani27b69c62015-05-15 15:49:02 -0700242 messagingService.registerHandler(subject.value(),
243 new InternalMessageResponder<>(decoder, encoder, handler));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700244 }
245
246 @Override
247 public <M> void addSubscriber(MessageSubject subject,
248 Function<byte[], M> decoder,
249 Consumer<M> handler,
Madan Jampaniec5ae342015-04-13 15:43:10 -0700250 Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900251 checkPermission(CLUSTER_WRITE);
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700252 messagingService.registerHandler(subject.value(),
253 new InternalMessageConsumer<>(decoder, handler),
254 executor);
255 }
256
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800257 /**
258 * Performs the timed function, returning the value it would while timing the operation.
259 *
260 * @param timedFunction the function to be timed
261 * @param meter the metering agent to be used to time the function
262 * @param opName the opname to be used when starting the meter
263 * @param <A> The param type of the function
264 * @param <B> The return type of the function
265 * @return the value returned by the timed function
266 */
267 private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
268 MeteringAgent meter, String opName) {
269 checkNotNull(timedFunction);
270 checkNotNull(meter);
271 checkNotNull(opName);
272 return new Function<A, B>() {
273 @Override
274 public B apply(A a) {
275 final MeteringAgent.Context context = meter.startTimer(opName);
276 B result = null;
277 try {
278 result = timedFunction.apply(a);
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800279 context.stop(null);
280 return result;
Aaron Kruglikove2411892016-02-10 18:18:17 -0800281 } catch (Exception e) {
282 context.stop(e);
283 Throwables.propagate(e);
284 return null;
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800285 }
286 }
287 };
288 }
289
290
Madan Jampanid36def02016-01-13 11:21:56 -0800291 private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
Madan Jampanic26eede2015-04-16 11:42:16 -0700292 private ClusterMessageHandler handler;
293
294 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
295 this.handler = handler;
296 }
297
298 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800299 public byte[] apply(Endpoint sender, byte[] bytes) {
Madan Jampanic26eede2015-04-16 11:42:16 -0700300 ClusterMessage message = ClusterMessage.fromBytes(bytes);
301 handler.handle(message);
302 return message.response();
303 }
304 }
305
Madan Jampanid36def02016-01-13 11:21:56 -0800306 private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700307 private final Function<byte[], M> decoder;
308 private final Function<R, byte[]> encoder;
Madan Jampani27b69c62015-05-15 15:49:02 -0700309 private final Function<M, CompletableFuture<R>> handler;
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700310
311 public InternalMessageResponder(Function<byte[], M> decoder,
312 Function<R, byte[]> encoder,
Madan Jampani27b69c62015-05-15 15:49:02 -0700313 Function<M, CompletableFuture<R>> handler) {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700314 this.decoder = decoder;
315 this.encoder = encoder;
316 this.handler = handler;
317 }
Madan Jampanic26eede2015-04-16 11:42:16 -0700318
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700319 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800320 public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800321 return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
322 apply(ClusterMessage.fromBytes(bytes).payload())).
323 thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700324 }
325 }
326
Madan Jampanid36def02016-01-13 11:21:56 -0800327 private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
Madan Jampani2bfa94c2015-04-11 05:03:49 -0700328 private final Function<byte[], M> decoder;
329 private final Consumer<M> consumer;
330
331 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
332 this.decoder = decoder;
333 this.consumer = consumer;
334 }
Madan Jampani8a895092014-10-17 16:55:50 -0700335
336 @Override
Madan Jampanid36def02016-01-13 11:21:56 -0800337 public void accept(Endpoint sender, byte[] bytes) {
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -0800338 consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
339 apply(ClusterMessage.fromBytes(bytes).payload()));
Madan Jampani8a895092014-10-17 16:55:50 -0700340 }
341 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800342}