blob: c273f436a69f706866a8aa7ae843fb60ebd4395c [file] [log] [blame]
Jordan Haltermanb8cace72018-07-02 17:39:21 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.cluster.impl;
17
Jordan Halterman43300782019-05-21 11:27:50 -070018import java.time.Duration;
Jordan Haltermanb8cace72018-07-02 17:39:21 -070019import java.util.Map;
20import java.util.Set;
21import java.util.concurrent.CompletableFuture;
22import java.util.concurrent.Executor;
23import java.util.concurrent.ExecutorService;
24import java.util.function.Consumer;
25import java.util.function.Function;
26
27import com.google.common.collect.Maps;
28import org.onlab.util.Tools;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
31import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
32import org.onosproject.store.cluster.messaging.MessageSubject;
33import org.onosproject.store.cluster.messaging.MessagingException;
34
35/**
36 * Cluster communication service implementation used for testing.
37 */
38public class TestClusterCommunicationService implements ClusterCommunicationService {
39 private final NodeId localNodeId;
40 private final Map<NodeId, TestClusterCommunicationService> nodes;
41 private final Map<MessageSubject, Function<byte[], CompletableFuture<byte[]>>> subscribers =
42 Maps.newConcurrentMap();
43
44 public TestClusterCommunicationService(NodeId localNodeId, Map<NodeId, TestClusterCommunicationService> nodes) {
45 this.localNodeId = localNodeId;
46 this.nodes = nodes;
47 nodes.put(localNodeId, this);
48 }
49
50 @Override
51 public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
52 nodes.forEach((nodeId, node) -> {
53 if (!nodeId.equals(localNodeId)) {
54 node.handle(subject, encoder.apply(message));
55 }
56 });
57 }
58
59 @Override
60 public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
61 nodes.values().forEach(node -> node.handle(subject, encoder.apply(message)));
62 }
63
64 @Override
65 public <M> CompletableFuture<Void> unicast(
66 M message, MessageSubject subject, Function<M, byte[]> encoder, NodeId toNodeId) {
67 TestClusterCommunicationService node = nodes.get(toNodeId);
68 if (node != null) {
69 node.handle(subject, encoder.apply(message));
70 }
71 return CompletableFuture.completedFuture(null);
72 }
73
74 @Override
75 public <M> void multicast(M message, MessageSubject subject, Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
76 nodes.entrySet().stream()
77 .filter(e -> nodeIds.contains(e.getKey()))
78 .forEach(e -> e.getValue().handle(subject, encoder.apply(message)));
79 }
80
81 @Override
82 public <M, R> CompletableFuture<R> sendAndReceive(
83 M message,
84 MessageSubject subject,
85 Function<M, byte[]> encoder,
86 Function<byte[], R> decoder,
Jordan Halterman43300782019-05-21 11:27:50 -070087 NodeId toNodeId,
88 Duration duration) {
Jordan Haltermanb8cace72018-07-02 17:39:21 -070089 TestClusterCommunicationService node = nodes.get(toNodeId);
90 if (node == null) {
91 return Tools.exceptionalFuture(new MessagingException.NoRemoteHandler());
92 }
93 return node.handle(subject, encoder.apply(message)).thenApply(decoder);
94 }
95
96 private CompletableFuture<byte[]> handle(MessageSubject subject, byte[] message) {
97 Function<byte[], CompletableFuture<byte[]>> subscriber = subscribers.get(subject);
98 if (subscriber != null) {
99 return subscriber.apply(message);
100 }
101 return Tools.exceptionalFuture(new MessagingException.NoRemoteHandler());
102 }
103
104 private boolean isSubscriber(MessageSubject subject) {
105 return subscribers.containsKey(subject);
106 }
107
108 @Override
109 public <M, R> void addSubscriber(
110 MessageSubject subject,
111 Function<byte[], M> decoder,
112 Function<M, R> handler,
113 Function<R, byte[]> encoder,
114 Executor executor) {
115 subscribers.put(subject, message -> {
116 CompletableFuture<byte[]> future = new CompletableFuture<>();
117 executor.execute(() -> {
118 try {
119 future.complete(encoder.apply(handler.apply(decoder.apply(message))));
120 } catch (Exception e) {
121 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
122 }
123 });
124 return future;
125 });
126 }
127
128 @Override
129 public <M, R> void addSubscriber(
130 MessageSubject subject,
131 Function<byte[], M> decoder,
132 Function<M, CompletableFuture<R>> handler,
133 Function<R, byte[]> encoder) {
134 subscribers.put(subject, message -> {
135 CompletableFuture<byte[]> future = new CompletableFuture<>();
136 try {
137 handler.apply(decoder.apply(message)).whenComplete((result, error) -> {
138 if (error == null) {
139 future.complete(encoder.apply(result));
140 } else {
141 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
142 }
143 });
144 } catch (Exception e) {
145 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
146 }
147 return future;
148 });
149 }
150
151 @Override
152 public <M> void addSubscriber(
153 MessageSubject subject,
154 Function<byte[], M> decoder,
155 Consumer<M> handler,
156 Executor executor) {
157 subscribers.put(subject, message -> {
158 CompletableFuture<byte[]> future = new CompletableFuture<>();
159 executor.execute(() -> {
160 try {
161 handler.accept(decoder.apply(message));
162 future.complete(null);
163 } catch (Exception e) {
164 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
165 }
166 });
167 return future;
168 });
169 }
170
171 @Override
172 public void removeSubscriber(MessageSubject subject) {
173 subscribers.remove(subject);
174 }
175
176 @Override
177 public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor) {
178 throw new UnsupportedOperationException();
179 }
180}