blob: f605289cb9eed2c2ec1f553635a23f7273b36dc3 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Jordan Halterman28183ee2017-10-17 17:29:10 -07002 * Copyright 2017-present Open Networking Foundation
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani890bc352014-10-01 22:35:29 -070017
Jordan Halterman28183ee2017-10-17 17:29:10 -070018import com.google.common.base.Objects;
19import com.google.common.base.Throwables;
20import org.apache.felix.scr.annotations.Activate;
Madan Jampani890bc352014-10-01 22:35:29 -070021import org.apache.felix.scr.annotations.Component;
Jordan Halterman28183ee2017-10-17 17:29:10 -070022import org.apache.felix.scr.annotations.Deactivate;
Madan Jampania5d0d782014-10-07 14:36:00 -070023import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
Madan Jampani890bc352014-10-01 22:35:29 -070025import org.apache.felix.scr.annotations.Service;
Jordan Halterman28183ee2017-10-17 17:29:10 -070026import org.onlab.util.Tools;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.ControllerNode;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
31import org.onosproject.store.cluster.messaging.ClusterMessage;
32import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
33import org.onosproject.store.cluster.messaging.Endpoint;
Brian O'Connorabafb502014-12-02 22:26:20 -080034import org.onosproject.store.cluster.messaging.MessageSubject;
Jordan Halterman28183ee2017-10-17 17:29:10 -070035import org.onosproject.store.cluster.messaging.MessagingService;
36import org.onosproject.utils.MeteringAgent;
37import org.slf4j.Logger;
38import org.slf4j.LoggerFactory;
39
Ray Milkey6a51cb92018-03-06 09:03:03 -080040import java.util.Set;
41import java.util.concurrent.CompletableFuture;
42import java.util.concurrent.Executor;
43import java.util.concurrent.ExecutorService;
44import java.util.function.BiConsumer;
45import java.util.function.BiFunction;
46import java.util.function.Consumer;
47import java.util.function.Function;
48import java.util.stream.Collectors;
49
Jordan Halterman28183ee2017-10-17 17:29:10 -070050import static com.google.common.base.Preconditions.checkArgument;
51import static com.google.common.base.Preconditions.checkNotNull;
52import static org.onosproject.security.AppGuard.checkPermission;
53import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
Madan Jampani24f9efb2014-10-24 18:56:23 -070054
Madan Jampani890bc352014-10-01 22:35:29 -070055@Component(immediate = true)
56@Service
Jordan Halterman28183ee2017-10-17 17:29:10 -070057public class ClusterCommunicationManager implements ClusterCommunicationService {
Madan Jampani890bc352014-10-01 22:35:29 -070058
Jordan Halterman28183ee2017-10-17 17:29:10 -070059 private final Logger log = LoggerFactory.getLogger(getClass());
60
61 private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
62 private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
63
64 private static final String PRIMITIVE_NAME = "clusterCommunication";
65 private static final String SUBJECT_PREFIX = "subject";
66 private static final String ENDPOINT_PREFIX = "endpoint";
67
68 private static final String SERIALIZING = "serialization";
69 private static final String DESERIALIZING = "deserialization";
70 private static final String NODE_PREFIX = "node:";
71 private static final String ROUND_TRIP_SUFFIX = ".rtt";
72 private static final String ONE_WAY_SUFFIX = ".oneway";
Aaron Kruglikov1110b2c2016-02-02 16:24:37 -080073
Madan Jampania5d0d782014-10-07 14:36:00 -070074 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070075 protected ClusterService clusterService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected MessagingService messagingService;
79
80 private NodeId localNodeId;
81
82 @Activate
83 public void activate() {
84 localNodeId = clusterService.getLocalNode().id();
85 log.info("Started");
86 }
87
88 @Deactivate
89 public void deactivate() {
90 log.info("Stopped");
91 }
Madan Jampani890bc352014-10-01 22:35:29 -070092
93 @Override
Jordan Halterman28183ee2017-10-17 17:29:10 -070094 public <M> void broadcast(M message,
95 MessageSubject subject,
96 Function<M, byte[]> encoder) {
97 checkPermission(CLUSTER_WRITE);
98 multicast(message,
99 subject,
100 encoder,
101 clusterService.getNodes()
102 .stream()
103 .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
104 .map(ControllerNode::id)
105 .collect(Collectors.toSet()));
106 }
107
108 @Override
109 public <M> void broadcastIncludeSelf(M message,
110 MessageSubject subject,
111 Function<M, byte[]> encoder) {
112 checkPermission(CLUSTER_WRITE);
113 multicast(message,
114 subject,
115 encoder,
116 clusterService.getNodes()
117 .stream()
118 .map(ControllerNode::id)
119 .collect(Collectors.toSet()));
120 }
121
122 @Override
123 public <M> CompletableFuture<Void> unicast(M message,
124 MessageSubject subject,
125 Function<M, byte[]> encoder,
126 NodeId toNodeId) {
127 checkPermission(CLUSTER_WRITE);
128 try {
129 byte[] payload = new ClusterMessage(
130 localNodeId,
131 subject,
132 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
133 ).getBytes();
134 return doUnicast(subject, payload, toNodeId);
135 } catch (Exception e) {
136 return Tools.exceptionalFuture(e);
137 }
138 }
139
140 @Override
141 public <M> void multicast(M message,
142 MessageSubject subject,
143 Function<M, byte[]> encoder,
144 Set<NodeId> nodes) {
145 checkPermission(CLUSTER_WRITE);
146 byte[] payload = new ClusterMessage(
147 localNodeId,
148 subject,
149 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
150 .getBytes();
151 nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
152 }
153
154 @Override
155 public <M, R> CompletableFuture<R> sendAndReceive(M message,
156 MessageSubject subject,
157 Function<M, byte[]> encoder,
158 Function<byte[], R> decoder,
159 NodeId toNodeId) {
160 checkPermission(CLUSTER_WRITE);
161 try {
162 ClusterMessage envelope = new ClusterMessage(
163 clusterService.getLocalNode().id(),
164 subject,
165 timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
166 apply(message));
167 return sendAndReceive(subject, envelope.getBytes(), toNodeId).
168 thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
169 } catch (Exception e) {
170 return Tools.exceptionalFuture(e);
171 }
172 }
173
174 private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
175 ControllerNode node = clusterService.getNode(toNodeId);
176 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
177 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
178 MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
179 return messagingService.sendAsync(nodeEp, subject.toString(), payload).whenComplete((r, e) -> context.stop(e));
180 }
181
182 private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
183 ControllerNode node = clusterService.getNode(toNodeId);
184 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
185 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
186 MeteringAgent.Context epContext = endpointMeteringAgent.
187 startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
188 MeteringAgent.Context subjectContext = subjectMeteringAgent.
189 startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
190 return messagingService.sendAndReceive(nodeEp, subject.toString(), payload).
191 whenComplete((bytes, throwable) -> {
192 subjectContext.stop(throwable);
193 epContext.stop(throwable);
194 });
195 }
196
197 @Override
198 public void addSubscriber(MessageSubject subject,
199 ClusterMessageHandler subscriber,
200 ExecutorService executor) {
201 checkPermission(CLUSTER_WRITE);
202 messagingService.registerHandler(subject.toString(),
203 new InternalClusterMessageHandler(subscriber),
204 executor);
205 }
206
207 @Override
208 public void removeSubscriber(MessageSubject subject) {
209 checkPermission(CLUSTER_WRITE);
210 messagingService.unregisterHandler(subject.toString());
211 }
212
213 @Override
214 public <M, R> void addSubscriber(MessageSubject subject,
215 Function<byte[], M> decoder,
216 Function<M, R> handler,
217 Function<R, byte[]> encoder,
218 Executor executor) {
219 checkPermission(CLUSTER_WRITE);
220 messagingService.registerHandler(subject.toString(),
221 new InternalMessageResponder<M, R>(decoder, encoder, m -> {
222 CompletableFuture<R> responseFuture = new CompletableFuture<>();
223 executor.execute(() -> {
224 try {
225 responseFuture.complete(handler.apply(m));
226 } catch (Exception e) {
227 responseFuture.completeExceptionally(e);
228 }
229 });
230 return responseFuture;
231 }));
232 }
233
234 @Override
235 public <M, R> void addSubscriber(MessageSubject subject,
236 Function<byte[], M> decoder,
237 Function<M, CompletableFuture<R>> handler,
238 Function<R, byte[]> encoder) {
239 checkPermission(CLUSTER_WRITE);
240 messagingService.registerHandler(subject.toString(),
241 new InternalMessageResponder<>(decoder, encoder, handler));
242 }
243
244 @Override
245 public <M> void addSubscriber(MessageSubject subject,
246 Function<byte[], M> decoder,
247 Consumer<M> handler,
248 Executor executor) {
249 checkPermission(CLUSTER_WRITE);
250 messagingService.registerHandler(subject.toString(),
251 new InternalMessageConsumer<>(decoder, handler),
252 executor);
253 }
254
255 /**
256 * Performs the timed function, returning the value it would while timing the operation.
257 *
258 * @param timedFunction the function to be timed
259 * @param meter the metering agent to be used to time the function
260 * @param opName the opname to be used when starting the meter
261 * @param <A> The param type of the function
262 * @param <B> The return type of the function
263 * @return the value returned by the timed function
264 */
265 private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
266 MeteringAgent meter, String opName) {
267 checkNotNull(timedFunction);
268 checkNotNull(meter);
269 checkNotNull(opName);
270 return new Function<A, B>() {
271 @Override
272 public B apply(A a) {
273 final MeteringAgent.Context context = meter.startTimer(opName);
274 B result = null;
275 try {
276 result = timedFunction.apply(a);
277 context.stop(null);
278 return result;
279 } catch (Exception e) {
280 context.stop(e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800281 Throwables.throwIfUnchecked(e.getCause());
282 throw new IllegalStateException(e.getCause());
Jordan Halterman28183ee2017-10-17 17:29:10 -0700283 }
284 }
285 };
286 }
287
288
289 private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
290 private ClusterMessageHandler handler;
291
292 public InternalClusterMessageHandler(ClusterMessageHandler handler) {
293 this.handler = handler;
294 }
295
296 @Override
297 public byte[] apply(Endpoint sender, byte[] bytes) {
298 ClusterMessage message = ClusterMessage.fromBytes(bytes);
299 handler.handle(message);
300 return message.response();
301 }
302 }
303
304 private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
305 private final Function<byte[], M> decoder;
306 private final Function<R, byte[]> encoder;
307 private final Function<M, CompletableFuture<R>> handler;
308
309 public InternalMessageResponder(Function<byte[], M> decoder,
310 Function<R, byte[]> encoder,
311 Function<M, CompletableFuture<R>> handler) {
312 this.decoder = decoder;
313 this.encoder = encoder;
314 this.handler = handler;
315 }
316
317 @Override
318 public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
319 return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
320 apply(ClusterMessage.fromBytes(bytes).payload())).
321 thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
322 }
323 }
324
325 private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
326 private final Function<byte[], M> decoder;
327 private final Consumer<M> consumer;
328
329 public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
330 this.decoder = decoder;
331 this.consumer = consumer;
332 }
333
334 @Override
335 public void accept(Endpoint sender, byte[] bytes) {
336 consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
337 apply(ClusterMessage.fromBytes(bytes).payload()));
338 }
Madan Jampani8a895092014-10-17 16:55:50 -0700339 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800340}