Using non-static serializers in Flow Rule Store
Change-Id: Ifacd9ca98d8c6d3bbf03b3b9784234f7eab458a5
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index dab63c0..2d864f9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -68,8 +68,6 @@
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.StoreSerializer;
-import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
@@ -183,13 +181,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
- protected static final StoreSerializer SERIALIZER = StoreSerializer.using(
- KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.STORE_COMMON)
- .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
- .build("FlowRuleStore"));
+ protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
- protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
+ protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MastershipBasedTimestamp.class);
@@ -223,7 +217,7 @@
deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
.withName("onos-flow-table-stats")
- .withSerializer(SERIALIZER_BUILDER)
+ .withSerializer(serializerBuilder)
.withAntiEntropyPeriod(5, TimeUnit.SECONDS)
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.withTombstonesDisabled()
@@ -331,17 +325,17 @@
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
- REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
+ REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
clusterCommunicator.addSubscriber(
- GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
+ GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
+ GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
+ REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
+ REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
+ FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackupReceipt, serializer::encode, executor);
}
private void unregisterMessageHandlers() {
@@ -386,8 +380,8 @@
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
FlowStoreMessageSubjects.GET_FLOW_ENTRY,
- SERIALIZER::encode,
- SERIALIZER::decode,
+ serializer::encode,
+ serializer::decode,
master),
FLOW_RULE_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
@@ -412,8 +406,8 @@
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
- SERIALIZER::encode,
- SERIALIZER::decode,
+ serializer::encode,
+ serializer::decode,
master),
FLOW_RULE_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
@@ -460,7 +454,7 @@
clusterCommunicator.unicast(operation,
APPLY_BATCH_FLOWS,
- SERIALIZER::encode,
+ serializer::encode,
master)
.whenComplete((result, error) -> {
if (error != null) {
@@ -607,8 +601,8 @@
return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
rule,
REMOVE_FLOW_ENTRY,
- SERIALIZER::encode,
- SERIALIZER::decode,
+ serializer::encode,
+ serializer::decode,
master));
}
@@ -633,7 +627,7 @@
notifyDelegate(event);
} else {
// TODO check unicast return value
- clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
+ clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
//error log: log.warn("Failed to respond to peer for batch operation result");
}
}
@@ -642,7 +636,7 @@
@Override
public void handle(final ClusterMessage message) {
- FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
+ FlowRuleBatchOperation operation = serializer.decode(message.payload());
log.debug("received batch request {}", operation);
final DeviceId deviceId = operation.deviceId();
@@ -657,7 +651,7 @@
// TODO: we might want to wrap response in envelope
// to distinguish sw programming failure and hand over
// it make sense in the latter case to retry immediately.
- message.respond(SERIALIZER.encode(allFailed));
+ message.respond(serializer.encode(allFailed));
return;
}
@@ -736,8 +730,8 @@
Set<DeviceId>>
sendAndReceive(deviceFlowEntries,
FLOW_TABLE_BACKUP,
- SERIALIZER::encode,
- SERIALIZER::decode,
+ serializer::encode,
+ serializer::decode,
nodeId)
.whenComplete((backedupDevices, error) -> {
Set<DeviceId> devicesNotBackedup = error != null ?
@@ -769,12 +763,12 @@
.withSerializer(new Serializer() {
@Override
public <T> byte[] encode(T object) {
- return SERIALIZER.encode(object);
+ return serializer.encode(object);
}
@Override
public <T> T decode(byte[] bytes) {
- return SERIALIZER.decode(bytes);
+ return serializer.decode(bytes);
}
})
.build());