Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
index 708faf6..b609a3c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowext/impl/DefaultFlowRuleExtRouter.java
@@ -20,6 +20,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -45,7 +46,6 @@
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.serializers.DecodeTo;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
@@ -199,18 +199,12 @@
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
- ClusterMessage message = new ClusterMessage(clusterService
- .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
-
- try {
- ListenableFuture<byte[]> responseFuture = clusterCommunicator
- .sendAndReceive(message, replicaInfo.master().get());
- // here should add another decode process
- return Futures.transform(responseFuture,
- new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
- } catch (IOException e) {
- return Futures.immediateFailedFuture(e);
- }
+ return clusterCommunicator.sendAndReceive(
+ batchOperation,
+ APPLY_EXTEND_FLOWS,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master().get());
}
/**