Implement service for registering, managing, and operating on remote Java proxies over the cluster communication service
Change-Id: I4576e3554cfad08747eed847b73fe695e219f3b8
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/ProxyManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/ProxyManagerTest.java
new file mode 100644
index 0000000..0185c0d
--- /dev/null
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/ProxyManagerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.ProxyFactory;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Proxy manager test.
+ */
+public class ProxyManagerTest {
+ @Test
+ public void testProxyManager() throws Exception {
+ TestClusterCommunicationServiceFactory clusterCommunicatorFactory =
+ new TestClusterCommunicationServiceFactory();
+
+ NodeId a = NodeId.nodeId("a");
+ NodeId b = NodeId.nodeId("b");
+
+ Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+
+ ProxyInterfaceImpl proxyInterface1 = new ProxyInterfaceImpl();
+ ProxyManager proxyManager1 = new ProxyManager();
+ proxyManager1.clusterCommunicator = clusterCommunicatorFactory.newCommunicationService(a);
+ proxyManager1.activate();
+ proxyManager1.registerProxyService(ProxyInterface.class, proxyInterface1, serializer);
+
+ ProxyInterfaceImpl proxyInterface2 = new ProxyInterfaceImpl();
+ ProxyManager proxyManager2 = new ProxyManager();
+ proxyManager2.clusterCommunicator = clusterCommunicatorFactory.newCommunicationService(b);
+ proxyManager2.activate();
+ proxyManager2.registerProxyService(ProxyInterface.class, proxyInterface2, serializer);
+
+ ProxyFactory<ProxyInterface> proxyFactory1 = proxyManager1.getProxyFactory(ProxyInterface.class, serializer);
+ assertEquals("Hello world!", proxyFactory1.getProxyFor(b).sync("Hello world!"));
+ assertEquals(1, proxyInterface2.syncCalls.get());
+ assertEquals("Hello world!", proxyFactory1.getProxyFor(b).async("Hello world!").join());
+ assertEquals(1, proxyInterface2.asyncCalls.get());
+
+ ProxyFactory<ProxyInterface> proxyFactory2 = proxyManager2.getProxyFactory(ProxyInterface.class, serializer);
+ assertEquals("Hello world!", proxyFactory2.getProxyFor(b).sync("Hello world!"));
+ assertEquals(2, proxyInterface2.syncCalls.get());
+ assertEquals("Hello world!", proxyFactory2.getProxyFor(b).async("Hello world!").join());
+ assertEquals(2, proxyInterface2.asyncCalls.get());
+
+ proxyManager1.deactivate();
+ proxyManager2.deactivate();
+ }
+
+ interface ProxyInterface {
+ String sync(String arg);
+ CompletableFuture<String> async(String arg);
+ }
+
+ class ProxyInterfaceImpl implements ProxyInterface {
+ private final AtomicInteger syncCalls = new AtomicInteger();
+ private final AtomicInteger asyncCalls = new AtomicInteger();
+
+ @Override
+ public String sync(String arg) {
+ syncCalls.incrementAndGet();
+ return arg;
+ }
+
+ @Override
+ public CompletableFuture<String> async(String arg) {
+ asyncCalls.incrementAndGet();
+ return CompletableFuture.completedFuture(arg);
+ }
+ }
+}
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationService.java b/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationService.java
new file mode 100644
index 0000000..680ddd0
--- /dev/null
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationService.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster.impl;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingException;
+
+/**
+ * Cluster communication service implementation used for testing.
+ */
+public class TestClusterCommunicationService implements ClusterCommunicationService {
+ private final NodeId localNodeId;
+ private final Map<NodeId, TestClusterCommunicationService> nodes;
+ private final Map<MessageSubject, Function<byte[], CompletableFuture<byte[]>>> subscribers =
+ Maps.newConcurrentMap();
+
+ public TestClusterCommunicationService(NodeId localNodeId, Map<NodeId, TestClusterCommunicationService> nodes) {
+ this.localNodeId = localNodeId;
+ this.nodes = nodes;
+ nodes.put(localNodeId, this);
+ }
+
+ @Override
+ public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+ nodes.forEach((nodeId, node) -> {
+ if (!nodeId.equals(localNodeId)) {
+ node.handle(subject, encoder.apply(message));
+ }
+ });
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+ nodes.values().forEach(node -> node.handle(subject, encoder.apply(message)));
+ }
+
+ @Override
+ public <M> CompletableFuture<Void> unicast(
+ M message, MessageSubject subject, Function<M, byte[]> encoder, NodeId toNodeId) {
+ TestClusterCommunicationService node = nodes.get(toNodeId);
+ if (node != null) {
+ node.handle(subject, encoder.apply(message));
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public <M> void multicast(M message, MessageSubject subject, Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+ nodes.entrySet().stream()
+ .filter(e -> nodeIds.contains(e.getKey()))
+ .forEach(e -> e.getValue().handle(subject, encoder.apply(message)));
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(
+ M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ TestClusterCommunicationService node = nodes.get(toNodeId);
+ if (node == null) {
+ return Tools.exceptionalFuture(new MessagingException.NoRemoteHandler());
+ }
+ return node.handle(subject, encoder.apply(message)).thenApply(decoder);
+ }
+
+ private CompletableFuture<byte[]> handle(MessageSubject subject, byte[] message) {
+ Function<byte[], CompletableFuture<byte[]>> subscriber = subscribers.get(subject);
+ if (subscriber != null) {
+ return subscriber.apply(message);
+ }
+ return Tools.exceptionalFuture(new MessagingException.NoRemoteHandler());
+ }
+
+ private boolean isSubscriber(MessageSubject subject) {
+ return subscribers.containsKey(subject);
+ }
+
+ @Override
+ public <M, R> void addSubscriber(
+ MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ Executor executor) {
+ subscribers.put(subject, message -> {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ future.complete(encoder.apply(handler.apply(decoder.apply(message))));
+ } catch (Exception e) {
+ future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
+ });
+ return future;
+ });
+ }
+
+ @Override
+ public <M, R> void addSubscriber(
+ MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler,
+ Function<R, byte[]> encoder) {
+ subscribers.put(subject, message -> {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ try {
+ handler.apply(decoder.apply(message)).whenComplete((result, error) -> {
+ if (error == null) {
+ future.complete(encoder.apply(result));
+ } else {
+ future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
+ });
+ } catch (Exception e) {
+ future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
+ return future;
+ });
+ }
+
+ @Override
+ public <M> void addSubscriber(
+ MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ Executor executor) {
+ subscribers.put(subject, message -> {
+ CompletableFuture<byte[]> future = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ handler.accept(decoder.apply(message));
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
+ });
+ return future;
+ });
+ }
+
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+ subscribers.remove(subject);
+ }
+
+ @Override
+ public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationServiceFactory.java b/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationServiceFactory.java
new file mode 100644
index 0000000..1205506
--- /dev/null
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/TestClusterCommunicationServiceFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster.impl;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+
+/**
+ * Test cluster communication service factory.
+ */
+public class TestClusterCommunicationServiceFactory {
+ private final Map<NodeId, TestClusterCommunicationService> nodes = Maps.newConcurrentMap();
+
+ /**
+ * Creates a new cluster communication service for the given node.
+ *
+ * @param localNodeId the node for which to create the service
+ * @return the communication service for the given node
+ */
+ public ClusterCommunicationService newCommunicationService(NodeId localNodeId) {
+ return new TestClusterCommunicationService(localNodeId, nodes);
+ }
+}