blob: b7a1b89270fcbdf5c27c4afcc4b10e50ca6eb780 [file] [log] [blame]
Jordan Halterman2bf177c2017-06-29 01:49:08 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Jordan Halterman2bf177c2017-06-29 01:49:08 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.resources.impl;
17
18import java.util.Map;
19import java.util.Set;
20import java.util.concurrent.CompletableFuture;
21import java.util.concurrent.Executor;
22import java.util.concurrent.ExecutorService;
23import java.util.function.Consumer;
24import java.util.function.Function;
25
26import com.google.common.collect.Maps;
27import io.atomix.utils.concurrent.Futures;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
30import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
31import org.onosproject.store.cluster.messaging.MessageSubject;
32import org.onosproject.store.cluster.messaging.MessagingException;
33
34/**
35 * Cluster communication service implementation used for testing.
36 */
37public class TestClusterCommunicationService implements ClusterCommunicationService {
38 private final NodeId localNodeId;
39 private final Map<NodeId, TestClusterCommunicationService> nodes;
40 private final Map<MessageSubject, Function<byte[], CompletableFuture<byte[]>>> subscribers =
41 Maps.newConcurrentMap();
42
43 public TestClusterCommunicationService(NodeId localNodeId, Map<NodeId, TestClusterCommunicationService> nodes) {
44 this.localNodeId = localNodeId;
45 this.nodes = nodes;
46 nodes.put(localNodeId, this);
47 }
48
49 @Override
50 public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
51 nodes.forEach((nodeId, node) -> {
52 if (!nodeId.equals(localNodeId)) {
53 node.handle(subject, encoder.apply(message));
54 }
55 });
56 }
57
58 @Override
59 public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
60 nodes.values().forEach(node -> node.handle(subject, encoder.apply(message)));
61 }
62
63 @Override
64 public <M> CompletableFuture<Void> unicast(
65 M message, MessageSubject subject, Function<M, byte[]> encoder, NodeId toNodeId) {
66 TestClusterCommunicationService node = nodes.get(toNodeId);
67 if (node != null) {
68 node.handle(subject, encoder.apply(message));
69 }
70 return CompletableFuture.completedFuture(null);
71 }
72
73 @Override
74 public <M> void multicast(M message, MessageSubject subject, Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
Yuta HIGUCHIfbd9ae92018-01-24 23:39:06 -080075 nodes.entrySet().stream()
76 .filter(e -> nodeIds.contains(e.getKey()))
77 .forEach(e -> e.getValue().handle(subject, encoder.apply(message)));
Jordan Halterman2bf177c2017-06-29 01:49:08 -070078 }
79
80 @Override
81 public <M, R> CompletableFuture<R> sendAndReceive(
82 M message,
83 MessageSubject subject,
84 Function<M, byte[]> encoder,
85 Function<byte[], R> decoder,
86 NodeId toNodeId) {
87 TestClusterCommunicationService node = nodes.get(toNodeId);
88 if (node == null) {
89 return Futures.exceptionalFuture(new MessagingException.NoRemoteHandler());
90 }
91 return node.handle(subject, encoder.apply(message)).thenApply(decoder);
92 }
93
94 private CompletableFuture<byte[]> handle(MessageSubject subject, byte[] message) {
95 Function<byte[], CompletableFuture<byte[]>> subscriber = subscribers.get(subject);
96 if (subscriber != null) {
97 return subscriber.apply(message);
98 }
99 return Futures.exceptionalFuture(new MessagingException.NoRemoteHandler());
100 }
101
102 private boolean isSubscriber(MessageSubject subject) {
103 return subscribers.containsKey(subject);
104 }
105
106 @Override
107 public <M, R> void addSubscriber(
108 MessageSubject subject,
109 Function<byte[], M> decoder,
110 Function<M, R> handler,
111 Function<R, byte[]> encoder,
112 Executor executor) {
113 subscribers.put(subject, message -> {
114 CompletableFuture<byte[]> future = new CompletableFuture<>();
115 executor.execute(() -> {
116 try {
117 future.complete(encoder.apply(handler.apply(decoder.apply(message))));
118 } catch (Exception e) {
119 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
120 }
121 });
122 return future;
123 });
124 }
125
126 @Override
127 public <M, R> void addSubscriber(
128 MessageSubject subject,
129 Function<byte[], M> decoder,
Jordan Halterman931d3e72018-02-27 10:48:50 -0800130 Function<M, CompletableFuture<R>> handler,
131 Function<R, byte[]> encoder) {
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700132 subscribers.put(subject, message -> {
133 CompletableFuture<byte[]> future = new CompletableFuture<>();
134 try {
135 handler.apply(decoder.apply(message)).whenComplete((result, error) -> {
136 if (error == null) {
137 future.complete(encoder.apply(result));
138 } else {
139 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
140 }
141 });
142 } catch (Exception e) {
143 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
144 }
145 return future;
146 });
147 }
148
149 @Override
150 public <M> void addSubscriber(
151 MessageSubject subject,
152 Function<byte[], M> decoder,
153 Consumer<M> handler,
154 Executor executor) {
155 subscribers.put(subject, message -> {
Jordan Halterman931d3e72018-02-27 10:48:50 -0800156 CompletableFuture<byte[]> future = new CompletableFuture<>();
157 executor.execute(() -> {
158 try {
159 handler.accept(decoder.apply(message));
160 future.complete(null);
161 } catch (Exception e) {
162 future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
163 }
164 });
165 return future;
Jordan Halterman2bf177c2017-06-29 01:49:08 -0700166 });
167 }
168
169 @Override
170 public void removeSubscriber(MessageSubject subject) {
171 subscribers.remove(subject);
172 }
173
174 @Override
175 public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor) {
176 throw new UnsupportedOperationException();
177 }
178}