Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 781e24b..f5807e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
 import com.hazelcast.core.IMap;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -35,6 +36,7 @@
 import org.onlab.util.BoundedThreadPool;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.NewConcurrentHashMap;
+import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
@@ -93,7 +95,6 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -360,22 +361,16 @@
         log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), rule.deviceId());
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.GET_FLOW_ENTRY,
-                SERIALIZER.encode(rule));
-
-        try {
-            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-            log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
-        }
-        return null;
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
+                                    FlowStoreMessageSubjects.GET_FLOW_ENTRY,
+                                    SERIALIZER::encode,
+                                    SERIALIZER::decode,
+                                    replicaInfo.master().get()),
+                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                               TimeUnit.MILLISECONDS,
+                               null);
     }
 
-
-
     @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
 
@@ -393,22 +388,16 @@
         log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                GET_DEVICE_FLOW_ENTRIES,
-                SERIALIZER.encode(deviceId));
-
-        try {
-            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-            log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
-        }
-        return Collections.emptyList();
+        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
+                                    FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+                                    SERIALIZER::encode,
+                                    SERIALIZER::decode,
+                                    replicaInfo.master().get()),
+                               FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                               TimeUnit.MILLISECONDS,
+                               Collections.emptyList());
     }
 
-
-
     @Override
     public void storeFlowRule(FlowRule rule) {
         storeBatch(new FlowRuleBatchOperation(
@@ -453,14 +442,10 @@
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(
-                local,
-                APPLY_BATCH_FLOWS,
-                SERIALIZER.encode(operation));
-
-
-        if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
-            log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
+        if (!clusterCommunicator.unicast(operation,
+                APPLY_BATCH_FLOWS, SERIALIZER::encode,
+                replicaInfo.master().get())) {
+            log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
 
             Set<FlowRule> allFailures = operation.getOperations().stream()
                     .map(op -> op.target())
@@ -612,18 +597,15 @@
         log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                REMOVE_FLOW_ENTRY,
-                SERIALIZER.encode(rule));
-
-        try {
-            Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
-            // TODO: Retry against latest master or throw a FlowStoreException
-            throw new RuntimeException(e);
-        }
+        return Futures.get(clusterCommunicator.sendAndReceive(
+                               rule,
+                               REMOVE_FLOW_ENTRY,
+                               SERIALIZER::encode,
+                               SERIALIZER::decode,
+                               replicaInfo.master().get()),
+                           FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                           TimeUnit.MILLISECONDS,
+                           RuntimeException.class);
     }
 
     private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -649,12 +631,8 @@
         if (nodeId == null) {
             notifyDelegate(event);
         } else {
-            ClusterMessage message = new ClusterMessage(
-                    clusterService.getLocalNode().id(),
-                    REMOTE_APPLY_COMPLETED,
-                    SERIALIZER.encode(event));
             // TODO check unicast return value
-            clusterCommunicator.unicast(message, nodeId);
+            clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
             //error log: log.warn("Failed to respond to peer for batch operation result");
         }
     }