blob: 4b7a80a68458d12f95976291a9078e1d38428e74 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Jordan Halterman28183ee2017-10-17 17:29:10 -07002 * Copyright 2017-present Open Networking Foundation
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Jordan Halterman28183ee2017-10-17 17:29:10 -070018import com.google.common.base.Objects;
19import com.google.common.base.Throwables;
Jordan Halterman28183ee2017-10-17 17:29:10 -070020import org.onlab.util.Tools;
21import org.onosproject.cluster.ClusterService;
22import org.onosproject.cluster.ControllerNode;
23import org.onosproject.cluster.NodeId;
24import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
25import org.onosproject.store.cluster.messaging.ClusterMessage;
26import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
27import org.onosproject.store.cluster.messaging.Endpoint;
Brian O'Connorabafb502014-12-02 22:26:20 -080028import org.onosproject.store.cluster.messaging.MessageSubject;
Jordan Halterman28183ee2017-10-17 17:29:10 -070029import org.onosproject.store.cluster.messaging.MessagingService;
30import org.onosproject.utils.MeteringAgent;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070031import org.osgi.service.component.annotations.Activate;
32import org.osgi.service.component.annotations.Component;
33import org.osgi.service.component.annotations.Deactivate;
34import org.osgi.service.component.annotations.Reference;
35import org.osgi.service.component.annotations.ReferenceCardinality;
Jordan Halterman28183ee2017-10-17 17:29:10 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Ray Milkey6a51cb92018-03-06 09:03:03 -080039import java.util.Set;
40import java.util.concurrent.CompletableFuture;
41import java.util.concurrent.Executor;
42import java.util.concurrent.ExecutorService;
43import java.util.function.BiConsumer;
44import java.util.function.BiFunction;
45import java.util.function.Consumer;
46import java.util.function.Function;
47import java.util.stream.Collectors;
48
Jordan Halterman28183ee2017-10-17 17:29:10 -070049import static com.google.common.base.Preconditions.checkArgument;
50import static com.google.common.base.Preconditions.checkNotNull;
51import static org.onosproject.security.AppGuard.checkPermission;
52import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
Madan Jampani24f9efb2014-10-24 18:56:23 -070053
Ray Milkeyd84f89b2018-08-17 14:54:17 -070054@Component(immediate = true, service = ClusterCommunicationService.class)
Jordan Halterman28183ee2017-10-17 17:29:10 -070055public class ClusterCommunicationManager implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070056
Jordan Halterman28183ee2017-10-17 17:29:10 -070057 private final Logger log = LoggerFactory.getLogger(getClass());
58
Ray Milkeyd84f89b2018-08-17 14:54:17 -070059 private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, false);
60 private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, false);
Jordan Halterman28183ee2017-10-17 17:29:10 -070061
62 private static final String PRIMITIVE_NAME = "clusterCommunication";
63 private static final String SUBJECT_PREFIX = "subject";
64 private static final String ENDPOINT_PREFIX = "endpoint";
65
66 private static final String SERIALIZING = "serialization";
67 private static final String DESERIALIZING = "deserialization";
68 private static final String NODE_PREFIX = "node:";
69 private static final String ROUND_TRIP_SUFFIX = ".rtt";
70 private static final String ONE_WAY_SUFFIX = ".oneway";
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080071
Ray Milkeyd84f89b2018-08-17 14:54:17 -070072 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070073 protected ClusterService clusterService;
74
Ray Milkeyd84f89b2018-08-17 14:54:17 -070075 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070076 protected MessagingService messagingService;
77
78 private NodeId localNodeId;
79
80 @Activate
81 public void activate() {
82 localNodeId = clusterService.getLocalNode().id();
83 log.info("Started");
84 }
85
86 @Deactivate
87 public void deactivate() {
88 log.info("Stopped");
89 }
Madan Jampani890bc352014-10-01 22:35:29 -070090
91 @Override
Jordan Halterman28183ee2017-10-17 17:29:10 -070092 public <M> void broadcast(M message,
93 MessageSubject subject,
94 Function<M, byte[]> encoder) {
95 checkPermission(CLUSTER_WRITE);
96 multicast(message,
97 subject,
98 encoder,
99 clusterService.getNodes()
100 .stream()
101 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
102 .map(ControllerNode::id)
103 .collect(Collectors.toSet()));
104 }
105
106 @Override
107 public <M> void broadcastIncludeSelf(M message,
108 MessageSubject subject,
109 Function<M, byte[]> encoder) {
110 checkPermission(CLUSTER_WRITE);
111 multicast(message,
112 subject,
113 encoder,
114 clusterService.getNodes()
115 .stream()
116 .map(ControllerNode::id)
117 .collect(Collectors.toSet()));
118 }
119
120 @Override
121 public <M> CompletableFuture<Void> unicast(M message,
122 MessageSubject subject,
123 Function<M, byte[]> encoder,
124 NodeId toNodeId) {
125 checkPermission(CLUSTER_WRITE);
126 try {
127 byte[] payload = new ClusterMessage(
128 localNodeId,
129 subject,
130 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
131 ).getBytes();
132 return doUnicast(subject, payload, toNodeId);
133 } catch (Exception e) {
134 return Tools.exceptionalFuture(e);
135 }
136 }
137
138 @Override
139 public <M> void multicast(M message,
140 MessageSubject subject,
141 Function<M, byte[]> encoder,
142 Set<NodeId> nodes) {
143 checkPermission(CLUSTER_WRITE);
144 byte[] payload = new ClusterMessage(
145 localNodeId,
146 subject,
147 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
148 .getBytes();
149 nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
150 }
151
152 @Override
153 public <M, R> CompletableFuture<R> sendAndReceive(M message,
154 MessageSubject subject,
155 Function<M, byte[]> encoder,
156 Function<byte[], R> decoder,
157 NodeId toNodeId) {
158 checkPermission(CLUSTER_WRITE);
159 try {
160 ClusterMessage envelope = new ClusterMessage(
161 clusterService.getLocalNode().id(),
162 subject,
163 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
164 apply(message));
165 return sendAndReceive(subject, envelope.getBytes(), toNodeId).
166 thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
167 } catch (Exception e) {
168 return Tools.exceptionalFuture(e);
169 }
170 }
171
172 private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
173 ControllerNode node = clusterService.getNode(toNodeId);
174 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
175 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
176 MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
177 return messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e));
178 }
179
180 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
181 ControllerNode node = clusterService.getNode(toNodeId);
182 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
183 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
184 MeteringAgent.Context epContext = endpointMeteringAgent.
185 startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
186 MeteringAgent.Context subjectContext = subjectMeteringAgent.
187 startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
188 return messagingService.sendAndReceive(nodeEp, subject.toString(), payload).
189 whenComplete((bytes, throwable) -> {
190 subjectContext.stop(throwable);
191 epContext.stop(throwable);
192 });
193 }
194
195 @Override
196 public void addSubscriber(MessageSubject subject,
197 ClusterMessageHandler subscriber,
198 ExecutorService executor) {
199 checkPermission(CLUSTER_WRITE);
200 messagingService.registerHandler(subject.toString(),
201 new InternalClusterMessageHandler(subscriber),
202 executor);
203 }
204
205 @Override
206 public void removeSubscriber(MessageSubject subject) {
207 checkPermission(CLUSTER_WRITE);
208 messagingService.unregisterHandler(subject.toString());
209 }
210
211 @Override
212 public <M, R> void addSubscriber(MessageSubject subject,
213 Function<byte[], M> decoder,
214 Function<M, R> handler,
215 Function<R, byte[]> encoder,
216 Executor executor) {
217 checkPermission(CLUSTER_WRITE);
218 messagingService.registerHandler(subject.toString(),
219 new InternalMessageResponder<M, R>(decoder, encoder, m -> {
220 CompletableFuture<R> responseFuture = new CompletableFuture<>();
221 executor.execute(() -> {
222 try {
223 responseFuture.complete(handler.apply(m));
224 } catch (Exception e) {
225 responseFuture.completeExceptionally(e);
226 }
227 });
228 return responseFuture;
229 }));
230 }
231
232 @Override
233 public <M, R> void addSubscriber(MessageSubject subject,
234 Function<byte[], M> decoder,
235 Function<M, CompletableFuture<R>> handler,
236 Function<R, byte[]> encoder) {
237 checkPermission(CLUSTER_WRITE);
238 messagingService.registerHandler(subject.toString(),
239 new InternalMessageResponder<>(decoder, encoder, handler));
240 }
241
242 @Override
243 public <M> void addSubscriber(MessageSubject subject,
244 Function<byte[], M> decoder,
245 Consumer<M> handler,
246 Executor executor) {
247 checkPermission(CLUSTER_WRITE);
248 messagingService.registerHandler(subject.toString(),
249 new InternalMessageConsumer<>(decoder, handler),
250 executor);
251 }
252
253 /**
254 * Performs the timed function, returning the value it would while timing the operation.
255 *
256 * @param timedFunction the function to be timed
257 * @param meter the metering agent to be used to time the function
258 * @param opName the opname to be used when starting the meter
259 * @param <A> The param type of the function
260 * @param <B> The return type of the function
261 * @return the value returned by the timed function
262 */
263 private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
264 MeteringAgent meter, String opName) {
265 checkNotNull(timedFunction);
266 checkNotNull(meter);
267 checkNotNull(opName);
268 return new Function<A, B>() {
269 @Override
270 public B apply(A a) {
271 final MeteringAgent.Context context = meter.startTimer(opName);
272 B result = null;
273 try {
274 result = timedFunction.apply(a);
275 context.stop(null);
276 return result;
277 } catch (Exception e) {
278 context.stop(e);
Jordan Haltermane76f0c82018-06-18 17:44:48 -0700279 Throwables.throwIfUnchecked(Throwables.getRootCause(e));
Ray Milkey6a51cb92018-03-06 09:03:03 -0800280 throw new IllegalStateException(e.getCause());
Jordan Halterman28183ee2017-10-17 17:29:10 -0700281 }
282 }
283 };
284 }
285
286
287 private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
288 private ClusterMessageHandler handler;
289
290 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
291 this.handler = handler;
292 }
293
294 @Override
295 public byte[] apply(Endpoint sender, byte[] bytes) {
296 ClusterMessage message = ClusterMessage.fromBytes(bytes);
297 handler.handle(message);
298 return message.response();
299 }
300 }
301
302 private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
303 private final Function<byte[], M> decoder;
304 private final Function<R, byte[]> encoder;
305 private final Function<M, CompletableFuture<R>> handler;
306
307 public InternalMessageResponder(Function<byte[], M> decoder,
308 Function<R, byte[]> encoder,
309 Function<M, CompletableFuture<R>> handler) {
310 this.decoder = decoder;
311 this.encoder = encoder;
312 this.handler = handler;
313 }
314
315 @Override
316 public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
317 return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
318 apply(ClusterMessage.fromBytes(bytes).payload())).
319 thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
320 }
321 }
322
323 private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
324 private final Function<byte[], M> decoder;
325 private final Consumer<M> consumer;
326
327 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
328 this.decoder = decoder;
329 this.consumer = consumer;
330 }
331
332 @Override
333 public void accept(Endpoint sender, byte[] bytes) {
334 consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
335 apply(ClusterMessage.fromBytes(bytes).payload()));
336 }
Madan Jampani8a895092014-10-17 16:55:50 -0700337 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800338}