Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
index a979b65..8c160e8 100644
--- a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
@@ -219,7 +219,7 @@
private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
String data = String.format("%d|%f|%f", time, overallRate, currentRate);
- communicationService.broadcast(new ClusterMessage(nodeId, SAMPLE, data.getBytes()));
+ communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
}
private class InternalSampleCollector implements ClusterMessageHandler {
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index 909f3a5..de9e9f2 100644
--- a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -249,14 +249,14 @@
public void start() {
if (stopped) {
stopped = false;
- communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
+ communicationService.broadcast(START, CONTROL, str -> str.getBytes());
startTestRun();
}
}
public void stop() {
if (!stopped) {
- communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
+ communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
stopTestRun();
}
}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index cbf2398..59970f3 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -15,13 +15,16 @@
*/
package org.onosproject.store.cluster.messaging;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
import org.onosproject.cluster.NodeId;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
+import com.google.common.util.concurrent.ListenableFuture;
-// TODO: remove IOExceptions?
/**
* Service for assisting communications between controller cluster nodes.
*/
@@ -33,6 +36,7 @@
* @param message message to send
* @return true if the message was sent successfully to all nodes; false otherwise.
*/
+ @Deprecated
boolean broadcast(ClusterMessage message);
/**
@@ -41,6 +45,7 @@
* @param message message to send
* @return true if the message was sent successfully to all nodes; false otherwise.
*/
+ @Deprecated
boolean broadcastIncludeSelf(ClusterMessage message);
/**
@@ -50,6 +55,7 @@
* @param toNodeId node identifier
* @return true if the message was sent successfully; false otherwise.
*/
+ @Deprecated
boolean unicast(ClusterMessage message, NodeId toNodeId);
/**
@@ -59,6 +65,7 @@
* @param nodeIds recipient node identifiers
* @return true if the message was sent successfully to all nodes in the group; false otherwise.
*/
+ @Deprecated
boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds);
/**
@@ -66,18 +73,9 @@
* @param message message to send
* @param toNodeId recipient node identifier
* @return reply future.
- * @throws IOException when I/O exception of some sort has occurred
- */
- ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
-
- /**
- * Adds a new subscriber for the specified message subject.
- *
- * @param subject message subject
- * @param subscriber message subscriber
*/
@Deprecated
- void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber);
+ ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId);
/**
* Adds a new subscriber for the specified message subject.
@@ -86,13 +84,115 @@
* @param subscriber message subscriber
* @param executor executor to use for running handler.
*/
+ @Deprecated
void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
/**
+ * Broadcasts a message to all controller nodes.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param <M> message type
+ */
+ <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder);
+
+ /**
+ * Broadcasts a message to all controller nodes including self.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param <M> message type
+ */
+ <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder);
+
+ /**
+ * Sends a message to the specified controller node.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param toNodeId destination node identifier
+ * @param <M> message type
+ * @return true if the message was sent successfully; false otherwise
+ */
+ <M> boolean unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId);
+
+ /**
+ * Multicasts a message to a set of controller nodes.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding message to byte[]
+ * @param nodeIds recipient node identifiers
+ * @param <M> message type
+ */
+ <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodeIds);
+
+ /**
+ * Sends a message and expects a reply.
+ *
+ * @param message message to send
+ * @param subject message subject
+ * @param encoder function for encoding request to byte[]
+ * @param decoder function for decoding response from byte[]
+ * @param toNodeId recipient node identifier
+ * @param <M> request type
+ * @param <R> reply type
+ * @return reply future
+ */
+ <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param decoder decoder for resurrecting incoming message
+ * @param handler handler function that process the incoming message and produces a reply
+ * @param encoder encoder for serializing reply
+ * @param executor executor to run this handler on
+ * @param <M> incoming message type
+ * @param <R> reply message type
+ */
+ <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ ExecutorService executor);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param decoder decoder to resurrecting incoming message
+ * @param handler handler for handling message
+ * @param executor executor to run this handler on
+ * @param <M> incoming message type
+ */
+ <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ ExecutorService executor);
+
+ /**
* Removes a subscriber for the specified message subject.
*
- * @param subject message subject
+ * @param subject message subject
*/
void removeSubscriber(MessageSubject subject);
-
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 5c1fc33..7e7ec06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -17,7 +17,6 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -54,13 +53,14 @@
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
@@ -351,22 +351,34 @@
*/
private void fetchBits(Application app) {
ControllerNode localNode = clusterService.getLocalNode();
- ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
- app.id().name().getBytes(Charsets.UTF_8));
- //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
CountDownLatch latch = new CountDownLatch(1);
// FIXME: send message with name & version to make sure we don't get served old bits
log.info("Downloading bits for application {}", app.id().name());
for (ControllerNode node : clusterService.getNodes()) {
- try {
- ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
- future.addListener(new InternalBitListener(app, node, future, latch), executor);
- } catch (IOException e) {
- log.debug("Unable to request bits for application {} from node {}",
- app.id().name(), node.id());
+ if (latch.getCount() == 0) {
+ break;
}
+ if (node.equals(localNode)) {
+ continue;
+ }
+ clusterCommunicator.sendAndReceive(app.id().name(),
+ APP_BITS_REQUEST,
+ s -> s.getBytes(Charsets.UTF_8),
+ Function.identity(),
+ node.id())
+ .whenCompleteAsync((bits, error) -> {
+ if (error == null && latch.getCount() > 0) {
+ saveApplication(new ByteArrayInputStream(bits));
+ log.info("Downloaded bits for application {} from node {}",
+ app.id().name(), node.id());
+ latch.countDown();
+ } else if (error != null) {
+ log.warn("Unable to fetch bits for application {} from node {}",
+ app.id().name(), node.id(), error);
+ }
+ }, executor);
}
try {
@@ -392,41 +404,6 @@
}
}
}
-
- /**
- * Processes completed fetch requests.
- */
- private class InternalBitListener implements Runnable {
- private final Application app;
- private final ControllerNode node;
- private final ListenableFuture<byte[]> future;
- private final CountDownLatch latch;
-
- public InternalBitListener(Application app, ControllerNode node,
- ListenableFuture<byte[]> future, CountDownLatch latch) {
- this.app = app;
- this.node = node;
- this.future = future;
- this.latch = latch;
- }
-
- @Override
- public void run() {
- if (latch.getCount() > 0 && !future.isCancelled()) {
- try {
- byte[] bits = future.get(1, MILLISECONDS);
- saveApplication(new ByteArrayInputStream(bits));
- log.info("Downloaded bits for application {} from node {}",
- app.id().name(), node.id());
- latch.countDown();
- } catch (Exception e) {
- log.warn("Unable to fetch bits for application {} from node {}",
- app.id().name(), node.id());
- }
- }
- }
- }
-
/**
* Prunes applications which are not in the map, but are on disk.
*/
@@ -449,6 +426,4 @@
appDesc.origin(), appDesc.permissions(),
appDesc.featuresRepo(), appDesc.features());
}
-
}
-
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 2f6a149..10bf6a4 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -419,10 +419,9 @@
// Dispatch to all instances
clusterCommunicator.broadcastIncludeSelf(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(leadershipEvent)));
+ leadershipEvent,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
} else {
//
// Test if time to expire a stale leader
@@ -491,11 +490,11 @@
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
+
clusterCommunicator.broadcastIncludeSelf(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(leadershipEvent)));
+ leadershipEvent,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
}
// Sleep forever until interrupted
@@ -519,11 +518,12 @@
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
+
clusterCommunicator.broadcastIncludeSelf(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(leadershipEvent)));
+ leadershipEvent,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
+
if (leaderLock.isLockedByCurrentThread()) {
leaderLock.unlock();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 42a79b3..204e0fd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.cluster.messaging.impl;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -37,8 +36,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@@ -122,7 +130,84 @@
return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
}
- private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
+ @Override
+ public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
+ SettableFuture<byte[]> response = SettableFuture.create();
+ sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
+ if (e == null) {
+ response.set(r);
+ } else {
+ response.setException(e);
+ }
+ });
+ return response;
+ }
+
+ @Override
+ public <M> void broadcast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ multicast(message,
+ subject,
+ encoder,
+ clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public <M> boolean unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId) {
+ byte[] payload = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message)).getBytes();
+ return unicastUnchecked(subject, payload, toNodeId);
+ }
+
+ @Override
+ public <M> void multicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Set<NodeId> nodes) {
+ byte[] payload = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message)).getBytes();
+ nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder,
+ NodeId toNodeId) {
+ ClusterMessage envelope = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ encoder.apply(message));
+ return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
+ }
+
+ private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
@@ -131,37 +216,15 @@
return true;
} catch (IOException e) {
log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
- throw e;
- }
- }
-
- private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
- try {
- return unicast(subject, payload, toNodeId);
- } catch (IOException e) {
return false;
}
}
- @Override
- public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
+ private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- try {
- return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
-
- } catch (IOException e) {
- log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
- throw e;
- }
- }
-
- @Override
- @Deprecated
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber) {
- messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
+ return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
}
@Override
@@ -202,6 +265,60 @@
}
}
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Function<M, R> handler,
+ Function<R, byte[]> encoder,
+ ExecutorService executor) {
+ messagingService.registerHandler(subject.value(),
+ new InternalMessageResponder<>(decoder, encoder, handler),
+ executor);
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder,
+ Consumer<M> handler,
+ ExecutorService executor) {
+ messagingService.registerHandler(subject.value(),
+ new InternalMessageConsumer<>(decoder, handler),
+ executor);
+ }
+
+ private class InternalMessageResponder<M, R> implements MessageHandler {
+ private final Function<byte[], M> decoder;
+ private final Function<R, byte[]> encoder;
+ private final Function<M, R> handler;
+
+ public InternalMessageResponder(Function<byte[], M> decoder,
+ Function<R, byte[]> encoder,
+ Function<M, R> handler) {
+ this.decoder = decoder;
+ this.encoder = encoder;
+ this.handler = handler;
+ }
+ @Override
+ public void handle(Message message) throws IOException {
+ R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+ message.respond(encoder.apply(response));
+ }
+ }
+
+ private class InternalMessageConsumer<M> implements MessageHandler {
+ private final Function<byte[], M> decoder;
+ private final Consumer<M> consumer;
+
+ public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+ this.decoder = decoder;
+ this.consumer = consumer;
+ }
+ @Override
+ public void handle(Message message) throws IOException {
+ consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
+ }
+ }
+
public static final class InternalClusterMessage extends ClusterMessage {
private final Message rawMessage;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 8bebff7..cf3700b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -343,11 +343,9 @@
private void notifyPeers(LeadershipEvent event) {
eventDispatcher.post(event);
- clusterCommunicator.broadcast(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event)));
+ clusterCommunicator.broadcast(event,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
}
private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
@@ -366,11 +364,9 @@
if (updatedLeader) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
eventDispatcher.post(event);
- clusterCommunicator.broadcast(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event)));
+ clusterCommunicator.broadcast(event,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
}
}
@@ -469,11 +465,9 @@
leaderBoard.forEach((path, leadership) -> {
if (leadership.leader().equals(localNodeId)) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
- clusterCommunicator.broadcast(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event)));
+ clusterCommunicator.broadcast(event,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
}
});
} catch (Exception e) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 131000b..b47376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -304,11 +304,9 @@
DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
providerId, deviceId, deviceDescription);
- ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
- SERIALIZER.encode(deviceInjectedEvent));
// TODO check unicast return value
- clusterCommunicator.unicast(clusterMessage, deviceNode);
+ clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
/* error log:
log.warn("Failed to process injected device id: {} desc: {} " +
"(cluster messaging failed: {})",
@@ -555,11 +553,9 @@
}
PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
- ClusterMessage clusterMessage = new ClusterMessage(
- localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
//TODO check unicast return value
- clusterCommunicator.unicast(clusterMessage, deviceNode);
+ clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
/* error log:
log.warn("Failed to process injected ports of device id: {} " +
"(cluster messaging failed: {})",
@@ -867,13 +863,8 @@
log.debug("{} has control of {}, forwarding remove request",
master, deviceId);
- ClusterMessage message = new ClusterMessage(
- myId,
- DEVICE_REMOVE_REQ,
- SERIALIZER.encode(deviceId));
-
// TODO check unicast return value
- clusterCommunicator.unicast(message, master);
+ clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
/* error log:
log.error("Failed to forward {} remove request to {}", deviceId, master, e);
*/
@@ -1057,19 +1048,11 @@
}
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, recipient);
+ clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
}
private void broadcastMessage(MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
}
private void notifyPeers(InternalDeviceEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 33e4251..c46131e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -510,11 +510,7 @@
}
private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- serializer.encode(event));
- return clusterCommunicator.unicast(message, peer);
+ return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
// Note: we had this flipped before...
// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 781e24b..f5807e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -22,6 +22,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
import com.hazelcast.core.IMap;
import org.apache.felix.scr.annotations.Activate;
@@ -35,6 +36,7 @@
import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
+import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
@@ -93,7 +95,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -360,22 +361,16 @@
log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), rule.deviceId());
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.GET_FLOW_ENTRY,
- SERIALIZER.encode(rule));
-
- try {
- Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
- }
- return null;
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
+ FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
-
-
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
@@ -393,22 +388,16 @@
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GET_DEVICE_FLOW_ENTRIES,
- SERIALIZER.encode(deviceId));
-
- try {
- Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
- }
- return Collections.emptyList();
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
+ FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptyList());
}
-
-
@Override
public void storeFlowRule(FlowRule rule) {
storeBatch(new FlowRuleBatchOperation(
@@ -453,14 +442,10 @@
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(
- local,
- APPLY_BATCH_FLOWS,
- SERIALIZER.encode(operation));
-
-
- if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
- log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
+ if (!clusterCommunicator.unicast(operation,
+ APPLY_BATCH_FLOWS, SERIALIZER::encode,
+ replicaInfo.master().get())) {
+ log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
Set<FlowRule> allFailures = operation.getOperations().stream()
.map(op -> op.target())
@@ -612,18 +597,15 @@
log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOVE_FLOW_ENTRY,
- SERIALIZER.encode(rule));
-
- try {
- Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- // TODO: Retry against latest master or throw a FlowStoreException
- throw new RuntimeException(e);
- }
+ return Futures.get(clusterCommunicator.sendAndReceive(
+ rule,
+ REMOVE_FLOW_ENTRY,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ RuntimeException.class);
}
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -649,12 +631,8 @@
if (nodeId == null) {
notifyDelegate(event);
} else {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOTE_APPLY_COMPLETED,
- SERIALIZER.encode(event));
// TODO check unicast return value
- clusterCommunicator.unicast(message, nodeId);
+ clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
//error log: log.warn("Failed to respond to peer for batch operation result");
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 708faf6..b609a3c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -20,6 +20,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -45,7 +46,6 @@
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.serializers.DecodeTo;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
@@ -199,18 +199,12 @@
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(clusterService
- .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
-
- try {
- ListenableFuture<byte[]> responseFuture = clusterCommunicator
- .sendAndReceive(message, replicaInfo.master().get());
- // here should add another decode process
- return Futures.transform(responseFuture,
- new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
- } catch (IOException e) {
- return Futures.immediateFailedFuture(e);
- }
+ return clusterCommunicator.sendAndReceive(
+ batchOperation,
+ APPLY_EXTEND_FLOWS,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get());
}
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 51b4111..ae1669b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -382,17 +382,13 @@
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupAddRequestMsg(groupDesc.deviceId(),
groupDesc);
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GroupStoreMessageSubjects.
- REMOTE_GROUP_OP_REQUEST,
- kryoBuilder.build().serialize(groupOp));
- if (!clusterCommunicator.unicast(message,
- mastershipService.
- getMasterFor(
- groupDesc.deviceId()))) {
+
+ if (!clusterCommunicator.unicast(groupOp,
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ m -> kryoBuilder.build().serialize(m),
+ mastershipService.getMasterFor(groupDesc.deviceId()))) {
log.warn("Failed to send request to master: {} to {}",
- message,
+ groupOp,
mastershipService.getMasterFor(groupDesc.deviceId()));
//TODO: Send Group operation failure event
}
@@ -472,16 +468,13 @@
type,
newBuckets,
newAppCookie);
- ClusterMessage message =
- new ClusterMessage(clusterService.getLocalNode().id(),
- GroupStoreMessageSubjects.
- REMOTE_GROUP_OP_REQUEST,
- kryoBuilder.build().serialize(groupOp));
- if (!clusterCommunicator.unicast(message,
- mastershipService.
- getMasterFor(deviceId))) {
+
+ if (!clusterCommunicator.unicast(groupOp,
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ m -> kryoBuilder.build().serialize(m),
+ mastershipService.getMasterFor(deviceId))) {
log.warn("Failed to send request to master: {} to {}",
- message,
+ groupOp,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
}
@@ -584,16 +577,13 @@
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupDeleteRequestMsg(deviceId,
appCookie);
- ClusterMessage message =
- new ClusterMessage(clusterService.getLocalNode().id(),
- GroupStoreMessageSubjects.
- REMOTE_GROUP_OP_REQUEST,
- kryoBuilder.build().serialize(groupOp));
- if (!clusterCommunicator.unicast(message,
- mastershipService.
- getMasterFor(deviceId))) {
+
+ if (!clusterCommunicator.unicast(groupOp,
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ m -> kryoBuilder.build().serialize(m),
+ mastershipService.getMasterFor(deviceId))) {
log.warn("Failed to send request to master: {} to {}",
- message,
+ groupOp,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
index 74236c3..1e6f8aa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
@@ -477,21 +477,13 @@
}
private void broadcastMessage(MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
}
private void unicastMessage(NodeId peer,
MessageSubject subject,
Object event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, peer);
+ clusterCommunicator.unicast(event, subject, SERIALIZER::encode, peer);
}
private void notifyDelegateIfNotNull(HostEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
index 48c430b..5e09d05 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/link/impl/GossipLinkStore.java
@@ -334,17 +334,12 @@
LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
- ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
- GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
// TODO check unicast return value
- clusterCommunicator.unicast(linkInjectedMessage, dstNode);
- /* error log:
- log.warn("Failed to process link update between src: {} and dst: {} " +
- "(cluster messaging failed: {})",
- linkDescription.src(), linkDescription.dst(), e);
- */
-
+ clusterCommunicator.unicast(linkInjectedEvent,
+ GossipLinkStoreMessageSubjects.LINK_INJECTED,
+ SERIALIZER::encode,
+ dstNode);
}
return linkEvent;
@@ -653,19 +648,11 @@
}
private void broadcastMessage(MessageSubject subject, Object event) {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.broadcast(message);
+ clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
}
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- SERIALIZER.encode(event));
- clusterCommunicator.unicast(message, recipient);
+ clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
}
private void notifyPeers(InternalLinkEvent event) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 25870b7..8a54502 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -181,20 +181,14 @@
} else {
return MastershipRole.NONE;
}
- } else {
- try {
- MastershipRole role = complete(clusterCommunicator.sendAndReceive(
- new ClusterMessage(
- localNodeId,
- ROLE_QUERY_SUBJECT,
- SERIALIZER.encode(deviceId)),
- nodeId));
- return role == null ? MastershipRole.NONE : role;
- } catch (IOException e) {
- log.warn("Failed to query {} for {}'s role. Defaulting to NONE", nodeId, deviceId, e);
- return MastershipRole.NONE;
- }
}
+ MastershipRole role = complete(clusterCommunicator.sendAndReceive(
+ deviceId,
+ ROLE_QUERY_SUBJECT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ nodeId));
+ return role == null ? MastershipRole.NONE : role;
}
@Override
@@ -276,17 +270,12 @@
if (!nodeId.equals(localNodeId)) {
log.debug("Forwarding request to relinquish "
+ "role for device {} to {}", deviceId, nodeId);
- try {
- return complete(clusterCommunicator.sendAndReceive(
- new ClusterMessage(
- localNodeId,
- ROLE_RELINQUISH_SUBJECT,
- SERIALIZER.encode(deviceId)),
- nodeId));
- } catch (IOException e) {
- log.warn("Failed to send a request to relinquish role for {} to {}", deviceId, nodeId, e);
- return null;
- }
+ return complete(clusterCommunicator.sendAndReceive(
+ deviceId,
+ ROLE_RELINQUISH_SUBJECT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ nodeId));
}
// Check if this node is can be managed by this node.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 7ee8712..8bf8ae6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -131,9 +131,7 @@
}
// TODO check unicast return value
- communicationService.unicast(new ClusterMessage(myId, PACKET_OUT_SUBJECT,
- SERIALIZER.encode(packet)),
- master);
+ communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
// error log: log.warn("Failed to send packet-out to {}", master);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
index f07c0e8..e314789 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedStatisticStore.java
@@ -16,6 +16,7 @@
package org.onosproject.store.statistic.impl;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -23,6 +24,7 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
@@ -47,12 +49,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.onlab.util.Tools.groupedThreads;
@@ -218,20 +217,15 @@
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getCurrentStatisticInternal(connectPoint);
} else {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GET_CURRENT,
- SERIALIZER.encode(connectPoint));
-
- try {
- Future<byte[]> response =
- clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
- return Collections.emptySet();
- }
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_CURRENT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
}
}
@@ -251,22 +245,16 @@
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getPreviousStatisticInternal(connectPoint);
} else {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GET_PREVIOUS,
- SERIALIZER.encode(connectPoint));
-
- try {
- Future<byte[]> response =
- clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
- return Collections.emptySet();
- }
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_PREVIOUS,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
}
-
}
private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
index 7054bd3..f271de4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/device/impl/GossipDeviceStoreTest.java
@@ -65,6 +65,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import static java.util.Arrays.asList;
import static org.easymock.EasyMock.*;
@@ -181,8 +182,9 @@
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN, CID, annotations);
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
- .andReturn(true).anyTimes();
+ clusterCommunicator.<InternalDeviceEvent>broadcast(
+ anyObject(InternalDeviceEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
deviceStore.createOrUpdateDevice(PID, deviceId, description);
verify(clusterCommunicator);
@@ -296,36 +298,43 @@
}
private void assertInternalDeviceEvent(NodeId sender,
- DeviceId deviceId,
- ProviderId providerId,
- DeviceDescription expectedDesc,
- Capture<ClusterMessage> actualMsg) {
- assertTrue(actualMsg.hasCaptured());
- assertEquals(sender, actualMsg.getValue().sender());
+ DeviceId deviceId,
+ ProviderId providerId,
+ DeviceDescription expectedDesc,
+ Capture<InternalDeviceEvent> actualEvent,
+ Capture<MessageSubject> actualSubject,
+ Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
+ assertTrue(actualEvent.hasCaptured());
+ assertTrue(actualSubject.hasCaptured());
+ assertTrue(actualEncoder.hasCaptured());
+
assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
- actualMsg.getValue().subject());
- InternalDeviceEvent addEvent
- = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
- assertEquals(deviceId, addEvent.deviceId());
- assertEquals(providerId, addEvent.providerId());
- assertDeviceDescriptionEquals(expectedDesc, addEvent.deviceDescription().value());
+ actualSubject.getValue());
+ assertEquals(deviceId, actualEvent.getValue().deviceId());
+ assertEquals(providerId, actualEvent.getValue().providerId());
+ assertDeviceDescriptionEquals(expectedDesc, actualEvent.getValue().deviceDescription().value());
}
private void assertInternalDeviceEvent(NodeId sender,
- DeviceId deviceId,
- ProviderId providerId,
- DeviceDescription expectedDesc,
- List<SparseAnnotations> expectedAnnotations,
- Capture<ClusterMessage> actualMsg) {
- assertTrue(actualMsg.hasCaptured());
- assertEquals(sender, actualMsg.getValue().sender());
+ DeviceId deviceId,
+ ProviderId providerId,
+ DeviceDescription expectedDesc,
+ List<SparseAnnotations> expectedAnnotations,
+ Capture<InternalDeviceEvent> actualEvent,
+ Capture<MessageSubject> actualSubject,
+ Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
+ assertTrue(actualEvent.hasCaptured());
+ assertTrue(actualSubject.hasCaptured());
+ assertTrue(actualEncoder.hasCaptured());
+
assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
- actualMsg.getValue().subject());
- InternalDeviceEvent addEvent
- = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
- assertEquals(deviceId, addEvent.deviceId());
- assertEquals(providerId, addEvent.providerId());
- assertDeviceDescriptionEquals(expectedDesc, expectedAnnotations, addEvent.deviceDescription().value());
+ actualSubject.getValue());
+ assertEquals(deviceId, actualEvent.getValue().deviceId());
+ assertEquals(providerId, actualEvent.getValue().providerId());
+ assertDeviceDescriptionEquals(
+ expectedDesc,
+ expectedAnnotations,
+ actualEvent.getValue().deviceDescription().value());
}
@Test
@@ -333,26 +342,28 @@
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, CID);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
verify(clusterCommunicator);
- assertInternalDeviceEvent(NID1, DID1, PID, description, bcast);
+ assertInternalDeviceEvent(NID1, DID1, PID, description, message, subject, encoder);
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN, CID);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
verify(clusterCommunicator);
- assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+ assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
reset(clusterCommunicator);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
@@ -366,7 +377,11 @@
HW, SW1, SN, CID, A2);
Capture<ClusterMessage> bcast = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
+
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
@@ -374,13 +389,13 @@
assertAnnotationsEquals(event.subject().annotations(), A2);
assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
- assertInternalDeviceEvent(NID1, DID1, PIDA, description, bcast);
+ assertInternalDeviceEvent(NID1, DID1, PIDA, description, message, subject, encoder);
// update from primary
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN, CID, A1);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
@@ -389,17 +404,17 @@
assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
assertTrue(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
- assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
+ assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
// no-op update from primary
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
verify(clusterCommunicator);
assertFalse("no broadcast expected", bcast.hasCaptured());
// For now, Ancillary is ignored once primary appears
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
@@ -410,7 +425,7 @@
DeviceDescription description3 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, CID, A2_2);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
assertEquals(DEVICE_UPDATED, event3.type());
@@ -423,7 +438,7 @@
verify(clusterCommunicator);
// note: only annotation from PIDA is sent over the wire
assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
- asList(union(A2, A2_2)), bcast);
+ asList(union(A2, A2_2)), message, subject, encoder);
}
@@ -434,23 +449,25 @@
putDevice(DID1, SW1);
assertTrue(deviceStore.isAvailable(DID1));
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event = deviceStore.markOffline(DID1);
assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
assertDevice(DID1, SW1, event.subject());
assertFalse(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
DeviceEvent event2 = deviceStore.markOffline(DID1);
assertNull("No change, no event", event2);
verify(clusterCommunicator);
- assertFalse(bcast.hasCaptured());
+ assertFalse(message.hasCaptured());
}
@Test
@@ -460,13 +477,15 @@
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, true)
);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
for (DeviceEvent event : events) {
@@ -485,11 +504,11 @@
new DefaultPortDescription(P3, true)
);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
events = deviceStore.updatePorts(PID, DID1, pds2);
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
@@ -513,11 +532,11 @@
new DefaultPortDescription(P1, false),
new DefaultPortDescription(P2, true)
);
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
events = deviceStore.updatePorts(PID, DID1, pds3);
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
@@ -544,9 +563,11 @@
);
deviceStore.updatePorts(PID, DID1, pds);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalPortStatusEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
assertEquals(PORT_UPDATED, event.type());
@@ -554,8 +575,8 @@
assertEquals(P1, event.port().number());
assertFalse("Port is disabled", event.port().isEnabled());
verify(clusterCommunicator);
- assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, bcast);
- assertTrue(bcast.hasCaptured());
+ assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, message, subject, encoder);
+ assertTrue(message.hasCaptured());
}
@Test
@@ -567,11 +588,13 @@
);
deviceStore.updatePorts(PID, DID1, pds);
- Capture<ClusterMessage> bcast = new Capture<>();
-
+ Capture<InternalPortStatusEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
// update port from primary
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
+
final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
assertEquals(PORT_UPDATED, event.type());
@@ -580,19 +603,19 @@
assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
assertFalse("Port is disabled", event.port().isEnabled());
verify(clusterCommunicator);
- assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), bcast);
- assertTrue(bcast.hasCaptured());
+ assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), message, subject, encoder);
+ assertTrue(message.hasCaptured());
// update port from ancillary with no attributes
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
assertNull("Ancillary is ignored if primary exists", event2);
verify(clusterCommunicator);
- assertFalse(bcast.hasCaptured());
+ assertFalse(message.hasCaptured());
// but, Ancillary annotation update will be notified
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
assertEquals(PORT_UPDATED, event3.type());
@@ -601,11 +624,11 @@
assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
assertFalse("Port is disabled", event3.port().isEnabled());
verify(clusterCommunicator);
- assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), bcast);
- assertTrue(bcast.hasCaptured());
+ assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), message, subject, encoder);
+ assertTrue(message.hasCaptured());
// port only reported from Ancillary will be notified as down
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
assertEquals(PORT_ADDED, event4.type());
@@ -616,25 +639,29 @@
event4.port().isEnabled());
verify(clusterCommunicator);
// TODO: verify broadcast message content
- assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, bcast);
- assertTrue(bcast.hasCaptured());
+ assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, message, subject, encoder);
+ assertTrue(message.hasCaptured());
}
- private void assertInternalPortStatusEvent(NodeId sender, DeviceId did,
- ProviderId pid, DefaultPortDescription expectedDesc,
- List<SparseAnnotations> expectedAnnotations, Capture<ClusterMessage> actualMsg) {
+ private void assertInternalPortStatusEvent(NodeId sender,
+ DeviceId did,
+ ProviderId pid,
+ DefaultPortDescription expectedDesc,
+ List<SparseAnnotations> expectedAnnotations,
+ Capture<InternalPortStatusEvent> actualEvent,
+ Capture<MessageSubject> actualSubject,
+ Capture<Function<InternalPortStatusEvent, byte[]>> actualEncoder) {
- assertTrue(actualMsg.hasCaptured());
- assertEquals(sender, actualMsg.getValue().sender());
+ assertTrue(actualEvent.hasCaptured());
+ assertTrue(actualSubject.hasCaptured());
+ assertTrue(actualEncoder.hasCaptured());
+
assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
- actualMsg.getValue().subject());
- InternalPortStatusEvent addEvent
- = testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
- assertEquals(did, addEvent.deviceId());
- assertEquals(pid, addEvent.providerId());
+ actualSubject.getValue());
+ assertEquals(did, actualEvent.getValue().deviceId());
+ assertEquals(pid, actualEvent.getValue().providerId());
assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
- addEvent.portDescription().value());
-
+ actualEvent.getValue().portDescription().value());
}
private void assertPortDescriptionEquals(
@@ -649,19 +676,31 @@
expectedAnnotations.toArray(new SparseAnnotations[0]));
}
- private void resetCommunicatorExpectingNoBroadcast(
- Capture<ClusterMessage> bcast) {
- bcast.reset();
+ private <T> void resetCommunicatorExpectingNoBroadcast(
+ Capture<T> message,
+ Capture<MessageSubject> subject,
+ Capture<Function<T, byte[]>> encoder) {
+ message.reset();
+ subject.reset();
+ encoder.reset();
reset(clusterCommunicator);
replay(clusterCommunicator);
}
- private void resetCommunicatorExpectingSingleBroadcast(
- Capture<ClusterMessage> bcast) {
+ private <T> void resetCommunicatorExpectingSingleBroadcast(
+ Capture<T> message,
+ Capture<MessageSubject> subject,
+ Capture<Function<T, byte[]>> encoder) {
- bcast.reset();
+ message.reset();
+ subject.reset();
+ encoder.reset();
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+ clusterCommunicator.broadcast(
+ capture(message),
+ capture(subject),
+ capture(encoder));
+ expectLastCall().once();
replay(clusterCommunicator);
}
@@ -724,9 +763,11 @@
assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalDeviceEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
DeviceEvent event = deviceStore.removeDevice(DID1);
assertEquals(DEVICE_REMOVED, event.type());
@@ -736,7 +777,7 @@
assertEquals(0, deviceStore.getPorts(DID1).size());
verify(clusterCommunicator);
// TODO: verify broadcast message
- assertTrue(bcast.hasCaptured());
+ assertTrue(message.hasCaptured());
// putBack Device, Port w/o annotation
putDevice(DID1, SW1);
@@ -825,10 +866,6 @@
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
}
-
- public <T> T deserialize(byte[] bytes) {
- return SERIALIZER.decode(bytes);
- }
}
private static final class TestClusterService extends StaticClusterService {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 0a3d5e6..8d495eb 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -30,6 +30,7 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractEvent;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -44,17 +45,20 @@
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
import static com.google.common.base.Preconditions.checkArgument;
import static junit.framework.TestCase.assertFalse;
@@ -281,7 +285,7 @@
// Set up expected internal message to be broadcast to peers on first put
expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
- .peekAtNextTimestamp()), clusterCommunicator);
+ .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
// Put first value
assertNull(ecMap.get(KEY1));
@@ -292,7 +296,7 @@
// Set up expected internal message to be broadcast to peers on second put
expectSpecificMulticastMessage(generatePutMessage(
- KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
+ KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
// Update same key to a new value
ecMap.put(KEY1, VALUE2);
@@ -341,7 +345,7 @@
// Remove the value and check the correct internal cluster messages
// are sent
expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- clusterCommunicator);
+ UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
ecMap.remove(KEY1);
assertNull(ecMap.get(KEY1));
@@ -352,7 +356,7 @@
// the map, we expect that the tombstone is updated and another remove
// event is sent to the cluster and external listeners.
expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
- clusterCommunicator);
+ UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
ecMap.remove(KEY1);
assertNull(ecMap.get(KEY1));
@@ -402,7 +406,7 @@
ecMap.addListener(listener);
// Expect a multi-update inter-instance message
- expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
+ expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
clusterCommunicator);
Map<String, String> putAllValues = new HashMap<>();
@@ -441,7 +445,7 @@
ecMap.put(KEY2, VALUE2);
ecMap.addListener(listener);
- expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
+ expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
ecMap.clear();
@@ -605,7 +609,8 @@
SERIALIZER.encode(Lists.newArrayList(event)));
}
- private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
+ private List<PutEntry<String, String>> generatePutMessage(
+ String key1, String value1, String key2, String value2) {
ArrayList<PutEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
@@ -617,10 +622,7 @@
list.add(pe1);
list.add(pe2);
-
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(list));
+ return list;
}
private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
@@ -631,7 +633,7 @@
SERIALIZER.encode(Lists.newArrayList(event)));
}
- private ClusterMessage generateRemoveMessage(String key1, String key2) {
+ private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
Timestamp timestamp1 = clockService.peek(1);
@@ -643,9 +645,7 @@
list.add(re1);
list.add(re2);
- return new ClusterMessage(
- clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
- SERIALIZER.encode(list));
+ return list;
}
/**
@@ -656,13 +656,13 @@
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
//FIXME rename
- private static void expectSpecificBroadcastMessage(ClusterMessage m,
- ClusterCommunicationService clusterCommunicator) {
+ private static <T> void expectSpecificBroadcastMessage(
+ T message,
+ MessageSubject subject,
+ ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
-// expect(clusterCommunicator.broadcast(m)).andReturn(true);
- expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
- .andReturn(true)
- .anyTimes();
+ clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
}
@@ -670,17 +670,16 @@
* Sets up a mock ClusterCommunicationService to expect a specific cluster
* message to be multicast to the cluster.
*
- * @param m message we expect to be sent
+ * @param message message we expect to be sent
+ * @param subject subject we expect to be sent to
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
//FIXME rename
- private static void expectSpecificMulticastMessage(ClusterMessage m,
+ private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
-// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
- expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
- .andReturn(true)
- .anyTimes();
+ clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
}
@@ -693,12 +692,15 @@
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
//FIXME rename
- private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
+ private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
// anyObject(Iterable.class)))
- expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
- anyObject(NodeId.class)))
+ expect(clusterCommunicator.<T>unicast(
+ anyObject(),
+ anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(NodeId.class)))
.andReturn(true)
.anyTimes();
replay(clusterCommunicator);
@@ -711,15 +713,14 @@
*
* @param clusterCommunicator a mock ClusterCommunicationService to set up
*/
- //FIXME rename
private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
reset(clusterCommunicator);
-// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
-// .andReturn(true)
-// .anyTimes();
- expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
- .andReturn(true)
- .anyTimes();
+ clusterCommunicator.<AbstractEvent>multicast(
+ anyObject(AbstractEvent.class),
+ anyObject(MessageSubject.class),
+ anyObject(Function.class),
+ anyObject(Set.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
}
@@ -733,45 +734,6 @@
implements ClusterCommunicationService {
@Override
- public boolean broadcast(ClusterMessage message) {
- return false;
- }
-
- @Override
- public boolean broadcastIncludeSelf(ClusterMessage message) {
- return false;
- }
-
- @Override
- public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- return false;
- }
-
- @Override
- public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
- return false;
- }
-
- @Override
- public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
- NodeId toNodeId)
- throws IOException {
- return null;
- }
-
- @Override
- public void addSubscriber(MessageSubject subject,
- ClusterMessageHandler subscriber) {
- if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
- updateHandler = subscriber;
- } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
- antiEntropyHandler = subscriber;
- } else {
- throw new RuntimeException("Unexpected message subject " + subject.toString());
- }
- }
-
- @Override
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber,
ExecutorService executor) {
@@ -786,6 +748,73 @@
@Override
public void removeSubscriber(MessageSubject subject) {}
+
+ @Override
+ public <M> void broadcast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder) {
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message,
+ MessageSubject subject, Function<M, byte[]> encoder) {
+ }
+
+ @Override
+ public <M> boolean unicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, NodeId toNodeId) {
+ return false;
+ }
+
+ @Override
+ public <M> void multicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Set<NodeId> nodes) {
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message,
+ MessageSubject subject, Function<M, byte[]> encoder,
+ Function<byte[], R> decoder, NodeId toNodeId) {
+ return null;
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder, Function<M, R> handler,
+ Function<R, byte[]> encoder, ExecutorService executor) {
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject,
+ Function<byte[], M> decoder, Consumer<M> handler,
+ ExecutorService executor) {
+ }
+
+ @Override
+ public boolean broadcast(ClusterMessage message) {
+ return false;
+ }
+
+ @Override
+ public boolean broadcastIncludeSelf(ClusterMessage message) {
+ return false;
+ }
+
+ @Override
+ public boolean unicast(ClusterMessage message, NodeId toNodeId) {
+ return false;
+ }
+
+ @Override
+ public boolean multicast(ClusterMessage message,
+ Iterable<NodeId> nodeIds) {
+ return false;
+ }
+
+ @Override
+ public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
+ NodeId toNodeId) {
+ return null;
+ }
}
/**
diff --git a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
index 03e82bc..b0da337 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/link/impl/GossipLinkStoreTest.java
@@ -28,7 +28,6 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.ConnectPoint;
@@ -48,7 +47,6 @@
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.cluster.StaticClusterService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.device.impl.DeviceClockManager;
@@ -59,6 +57,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
@@ -119,7 +118,6 @@
private DeviceClockManager deviceClockManager;
private DeviceClockService deviceClockService;
private ClusterCommunicationService clusterCommunicator;
- private MastershipService mastershipService;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -171,26 +169,24 @@
ConnectPoint src = new ConnectPoint(srcId, srcNum);
ConnectPoint dst = new ConnectPoint(dstId, dstNum);
reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
- .andReturn(true).anyTimes();
+ clusterCommunicator.<InternalLinkEvent>broadcast(
+ anyObject(InternalLinkEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
+ expectLastCall().anyTimes();
replay(clusterCommunicator);
linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
verify(clusterCommunicator);
}
- private void resetCommunicatorExpectingNoBroadcast(
- Capture<ClusterMessage> bcast) {
- bcast.reset();
+ private <T> void resetCommunicatorExpectingSingleBroadcast(
+ Capture<T> message,
+ Capture<MessageSubject> subject,
+ Capture<Function<T, byte[]>> encoder) {
+ message.reset();
+ subject.reset();
+ encoder.reset();
reset(clusterCommunicator);
- replay(clusterCommunicator);
- }
-
- private void resetCommunicatorExpectingSingleBroadcast(
- Capture<ClusterMessage> bcast) {
-
- bcast.reset();
- reset(clusterCommunicator);
- expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
+ clusterCommunicator.broadcast(capture(message), capture(subject), capture(encoder));
+ expectLastCall().once();
replay(clusterCommunicator);
}
@@ -367,56 +363,55 @@
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalLinkEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
// add link
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
final DefaultLinkDescription linkDescription = new DefaultLinkDescription(src, dst, INDIRECT);
LinkEvent event = linkStore.createOrUpdateLink(PID,
linkDescription);
- verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject());
assertEquals(LINK_ADDED, event.type());
// update link type
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event2 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject());
assertEquals(LINK_UPDATED, event2.type());
// no change
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event3 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
- verifyNoBroadcastMessage(bcast);
+ verifyNoBroadcastMessage(message);
assertNull("No change event expected", event3);
}
- private void verifyNoBroadcastMessage(Capture<ClusterMessage> bcast) {
- assertFalse("No broadcast expected", bcast.hasCaptured());
+ private <T> void verifyNoBroadcastMessage(Capture<T> message) {
+ assertFalse("No broadcast expected", message.hasCaptured());
}
private void verifyLinkBroadcastMessage(ProviderId providerId,
- NodeId sender,
- ConnectPoint src,
- ConnectPoint dst,
- Type type,
- Capture<ClusterMessage> actualMsg) {
+ NodeId sender,
+ ConnectPoint src,
+ ConnectPoint dst,
+ Type type,
+ Capture<InternalLinkEvent> actualLinkEvent,
+ Capture<MessageSubject> actualSubject,
+ Capture<Function<InternalLinkEvent, byte[]>> actualEncoder) {
verify(clusterCommunicator);
- assertTrue(actualMsg.hasCaptured());
- assertEquals(sender, actualMsg.getValue().sender());
- assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE,
- actualMsg.getValue().subject());
- InternalLinkEvent linkEvent
- = GossipLinkStore.SERIALIZER.decode(actualMsg.getValue().payload());
- assertEquals(providerId, linkEvent.providerId());
- assertLinkDescriptionEquals(src, dst, type, linkEvent.linkDescription().value());
-
+ assertTrue(actualLinkEvent.hasCaptured());
+ assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE, actualSubject.getValue());
+ assertEquals(providerId, actualLinkEvent.getValue().providerId());
+ assertLinkDescriptionEquals(src, dst, type, actualLinkEvent.getValue().linkDescription().value());
}
private static void assertLinkDescriptionEquals(ConnectPoint src,
@@ -434,31 +429,33 @@
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
- Capture<ClusterMessage> bcast = new Capture<>();
+ Capture<InternalLinkEvent> message = new Capture<>();
+ Capture<MessageSubject> subject = new Capture<>();
+ Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
// add Ancillary link
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, INDIRECT, A1));
- verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, bcast);
+ verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, message, subject, encoder);
assertNotNull("Ancillary only link is ignored", event);
// add Primary link
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event2 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, INDIRECT, A2));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
assertEquals(LINK_UPDATED, event2.type());
// update link type
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event3 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
@@ -466,38 +463,38 @@
// no change
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event4 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
- verifyNoBroadcastMessage(bcast);
+ verifyNoBroadcastMessage(message);
assertNull("No change event expected", event4);
// update link annotation (Primary)
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event5 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2_2));
- verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast);
+ verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
assertEquals(LINK_UPDATED, event5.type());
// update link annotation (Ancillary)
- resetCommunicatorExpectingSingleBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, DIRECT, A1_2));
- verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, bcast);
+ verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, message, subject, encoder);
assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
assertEquals(LINK_UPDATED, event6.type());
// update link type (Ancillary) : ignored
- resetCommunicatorExpectingNoBroadcast(bcast);
+ resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, EDGE));
- verifyNoBroadcastMessage(bcast);
+ verifyNoBroadcastMessage(message);
assertNull("Ancillary change other than annotation is ignored", event7);
}
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/DecodeTo.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/DecodeTo.java
deleted file mode 100644
index ccbfaa8..0000000
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/DecodeTo.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.serializers;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Function;
-
-/**
- * Function to convert byte[] into {@code T}.
- *
- * @param <T> Type after decoding
- */
-public final class DecodeTo<T> implements Function<byte[], T> {
-
- private StoreSerializer serializer;
-
- public DecodeTo(StoreSerializer serializer) {
- this.serializer = checkNotNull(serializer);
- }
-
- @Override
- public T apply(byte[] input) {
- return serializer.decode(input);
- }
-}
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index 01dda9e..62330fb 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -15,10 +15,10 @@
*/
package org.onlab.util;
-import com.google.common.base.Strings;
-import com.google.common.primitives.UnsignedLongs;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
+import static java.nio.file.Files.delete;
+import static java.nio.file.Files.walkFileTree;
+import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
+import static org.slf4j.LoggerFactory.getLogger;
import java.io.BufferedReader;
import java.io.File;
@@ -37,12 +37,17 @@
import java.util.Collection;
import java.util.Dictionary;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
-import static java.nio.file.Files.delete;
-import static java.nio.file.Files.walkFileTree;
-import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
-import static org.slf4j.LoggerFactory.getLogger;
+import org.slf4j.Logger;
+
+import com.google.common.base.Strings;
+import com.google.common.primitives.UnsignedLongs;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Miscellaneous utility methods.
@@ -324,6 +329,51 @@
dst.getAbsolutePath()));
}
+ /**
+ * Returns the future value when complete or if future
+ * completes exceptionally returns the defaultValue.
+ * @param future future
+ * @param defaultValue default value
+ * @param <T> future value type
+ * @return future value when complete or if future
+ * completes exceptionally returns the defaultValue.
+ */
+ public static <T> T futureGetOrElse(Future<T> future, T defaultValue) {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return defaultValue;
+ } catch (ExecutionException e) {
+ return defaultValue;
+ }
+ }
+
+ /**
+ * Returns the future value when complete or if future
+ * completes exceptionally returns the defaultValue.
+ * @param future future
+ * @param timeout time to wait for successful completion
+ * @param timeUnit time unit
+ * @param defaultValue default value
+ * @param <T> future value type
+ * @return future value when complete or if future
+ * completes exceptionally returns the defaultValue.
+ */
+ public static <T> T futureGetOrElse(Future<T> future,
+ long timeout,
+ TimeUnit timeUnit,
+ T defaultValue) {
+ try {
+ return future.get(timeout, timeUnit);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return defaultValue;
+ } catch (ExecutionException | TimeoutException e) {
+ return defaultValue;
+ }
+ }
+
// Auxiliary path visitor for recursive directory structure copying.
private static class DirectoryCopier extends SimpleFileVisitor<Path> {
private Path src;
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index 12e1d87..d7823d4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -16,10 +16,9 @@
package org.onlab.netty;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
* Interface for low level messaging primitives.
*/
@@ -40,9 +39,8 @@
* @param type type of message.
* @param payload message payload.
* @return a response future
- * @throws IOException when I/O exception of some sort has occurred
*/
- public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
+ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
/**
* Registers a new message handler for message type.
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 099880d..f96bb0b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -39,6 +39,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -56,8 +57,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
/**
* A Netty based implementation of MessagingService.
@@ -69,14 +68,14 @@
private final Endpoint localEp;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
- private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
+ private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.expireAfterWrite(10, TimeUnit.SECONDS)
- .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
+ .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
@Override
- public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
+ public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
if (entry.wasEvicted()) {
- entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
+ entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
}
}
})
@@ -178,11 +177,10 @@
}
@Override
- public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
- throws IOException {
- SettableFuture<byte[]> futureResponse = SettableFuture.create();
+ public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
+ CompletableFuture<byte[]> response = new CompletableFuture<>();
Long messageId = messageIdGenerator.incrementAndGet();
- responseFutures.put(messageId, futureResponse);
+ responseFutures.put(messageId, response);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
@@ -193,9 +191,9 @@
sendAsync(ep, message);
} catch (Exception e) {
responseFutures.invalidate(messageId);
- throw e;
+ response.completeExceptionally(e);
}
- return futureResponse;
+ return response;
}
@Override
@@ -333,10 +331,10 @@
String type = message.type();
if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
try {
- SettableFuture<byte[]> futureResponse =
+ CompletableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
- futureResponse.set(message.payload());
+ futureResponse.complete(message.payload());
} else {
log.warn("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"