Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 708faf6..b609a3c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -20,6 +20,7 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -45,7 +46,6 @@
 import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.serializers.DecodeTo;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.StoreSerializer;
 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
@@ -199,18 +199,12 @@
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
                   replicaInfo.master().orNull(), deviceId);
 
-        ClusterMessage message = new ClusterMessage(clusterService
-                    .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
-
-        try {
-            ListenableFuture<byte[]> responseFuture = clusterCommunicator
-                    .sendAndReceive(message, replicaInfo.master().get());
-            // here should add another decode process
-            return Futures.transform(responseFuture,
-                                     new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
-        } catch (IOException e) {
-            return Futures.immediateFailedFuture(e);
-        }
+        return clusterCommunicator.sendAndReceive(
+                batchOperation,
+                APPLY_EXTEND_FLOWS,
+                SERIALIZER::encode,
+                SERIALIZER::decode,
+                replicaInfo.master().get());
     }
 
     /**