Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 781e24b..f5807e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -22,6 +22,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
import com.hazelcast.core.IMap;
import org.apache.felix.scr.annotations.Activate;
@@ -35,6 +36,7 @@
import org.onlab.util.BoundedThreadPool;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
+import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
@@ -93,7 +95,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -360,22 +361,16 @@
log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), rule.deviceId());
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- FlowStoreMessageSubjects.GET_FLOW_ENTRY,
- SERIALIZER.encode(rule));
-
- try {
- Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
- }
- return null;
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
+ FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
-
-
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
@@ -393,22 +388,16 @@
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- GET_DEVICE_FLOW_ENTRIES,
- SERIALIZER.encode(deviceId));
-
- try {
- Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
- }
- return Collections.emptyList();
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
+ FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptyList());
}
-
-
@Override
public void storeFlowRule(FlowRule rule) {
storeBatch(new FlowRuleBatchOperation(
@@ -453,14 +442,10 @@
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(
- local,
- APPLY_BATCH_FLOWS,
- SERIALIZER.encode(operation));
-
-
- if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
- log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
+ if (!clusterCommunicator.unicast(operation,
+ APPLY_BATCH_FLOWS, SERIALIZER::encode,
+ replicaInfo.master().get())) {
+ log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
Set<FlowRule> allFailures = operation.getOperations().stream()
.map(op -> op.target())
@@ -612,18 +597,15 @@
log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOVE_FLOW_ENTRY,
- SERIALIZER.encode(rule));
-
- try {
- Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
- } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
- // TODO: Retry against latest master or throw a FlowStoreException
- throw new RuntimeException(e);
- }
+ return Futures.get(clusterCommunicator.sendAndReceive(
+ rule,
+ REMOVE_FLOW_ENTRY,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get()),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ RuntimeException.class);
}
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -649,12 +631,8 @@
if (nodeId == null) {
notifyDelegate(event);
} else {
- ClusterMessage message = new ClusterMessage(
- clusterService.getLocalNode().id(),
- REMOTE_APPLY_COMPLETED,
- SERIALIZER.encode(event));
// TODO check unicast return value
- clusterCommunicator.unicast(message, nodeId);
+ clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
//error log: log.warn("Failed to respond to peer for batch operation result");
}
}