Using non-static serializers in Flow Rule Store

Change-Id: Ifacd9ca98d8c6d3bbf03b3b9784234f7eab458a5
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index dab63c0..2d864f9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -68,8 +68,6 @@
 import org.onosproject.store.flow.ReplicaInfoService;
 import org.onosproject.store.impl.MastershipBasedTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.StoreSerializer;
-import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
@@ -183,13 +181,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
-    protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
-            KryoNamespace.newBuilder()
-                    .register(DistributedStoreSerializers.STORE_COMMON)
-                    .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
-                    .build("FlowRuleStore"));
+    protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
 
-    protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
+    protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
             .register(KryoNamespaces.API)
             .register(MastershipBasedTimestamp.class);
 
@@ -223,7 +217,7 @@
 
         deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
                 .withName("onos-flow-table-stats")
-                .withSerializer(SERIALIZER_BUILDER)
+                .withSerializer(serializerBuilder)
                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
                 .withTombstonesDisabled()
@@ -331,17 +325,17 @@
 
         clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
         clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
-                REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
+                REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
         clusterCommunicator.addSubscriber(
-                GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
+                GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
+                GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
+                REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
+                REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
-                FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
+                FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
     }
 
     private void unregisterMessageHandlers() {
@@ -386,8 +380,8 @@
 
         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
                                     FlowStoreMessageSubjects.GET_FLOW_ENTRY,
-                                    SERIALIZER::encode,
-                                    SERIALIZER::decode,
+                                    serializer::encode,
+                                    serializer::decode,
                                     master),
                                FLOW_RULE_STORE_TIMEOUT_MILLIS,
                                TimeUnit.MILLISECONDS,
@@ -412,8 +406,8 @@
 
         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
                                     FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
-                                    SERIALIZER::encode,
-                                    SERIALIZER::decode,
+                                    serializer::encode,
+                                    serializer::decode,
                                     master),
                                FLOW_RULE_STORE_TIMEOUT_MILLIS,
                                TimeUnit.MILLISECONDS,
@@ -460,7 +454,7 @@
 
         clusterCommunicator.unicast(operation,
                                     APPLY_BATCH_FLOWS,
-                                    SERIALIZER::encode,
+                                    serializer::encode,
                                     master)
                            .whenComplete((result, error) -> {
                                if (error != null) {
@@ -607,8 +601,8 @@
         return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
                                rule,
                                REMOVE_FLOW_ENTRY,
-                               SERIALIZER::encode,
-                               SERIALIZER::decode,
+                               serializer::encode,
+                               serializer::decode,
                                master));
     }
 
@@ -633,7 +627,7 @@
             notifyDelegate(event);
         } else {
             // TODO check unicast return value
-            clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
+            clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
             //error log: log.warn("Failed to respond to peer for batch operation result");
         }
     }
@@ -642,7 +636,7 @@
 
         @Override
         public void handle(final ClusterMessage message) {
-            FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
+            FlowRuleBatchOperation operation = serializer.decode(message.payload());
             log.debug("received batch request {}", operation);
 
             final DeviceId deviceId = operation.deviceId();
@@ -657,7 +651,7 @@
                 // TODO: we might want to wrap response in envelope
                 // to distinguish sw programming failure and hand over
                 // it make sense in the latter case to retry immediately.
-                message.respond(SERIALIZER.encode(allFailed));
+                message.respond(serializer.encode(allFailed));
                 return;
             }
 
@@ -736,8 +730,8 @@
                                  Set<DeviceId>>
                     sendAndReceive(deviceFlowEntries,
                                    FLOW_TABLE_BACKUP,
-                                   SERIALIZER::encode,
-                                   SERIALIZER::decode,
+                                   serializer::encode,
+                                   serializer::decode,
                                    nodeId)
                     .whenComplete((backedupDevices, error) -> {
                         Set<DeviceId> devicesNotBackedup = error != null ?
@@ -769,12 +763,12 @@
                         .withSerializer(new Serializer() {
                             @Override
                             public <T> byte[] encode(T object) {
-                                return SERIALIZER.encode(object);
+                                return serializer.encode(object);
                             }
 
                             @Override
                             public <T> T decode(byte[] bytes) {
-                                return SERIALIZER.decode(bytes);
+                                return serializer.decode(bytes);
                             }
                         })
                         .build());