convert anonymous class to nested class
Change-Id: I2d0770b80ca4806fabf31fd358ecb165d3e9f778
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 7a917a7..bd2742a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -74,12 +74,13 @@
import org.onlab.onos.store.flow.ReplicaInfoService;
import org.onlab.onos.store.hz.AbstractHazelcastStore;
import org.onlab.onos.store.hz.SMap;
+import org.onlab.onos.store.serializers.DecodeTo;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
-import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -146,7 +147,7 @@
// TODO make this configurable
private boolean syncBackup = false;
- protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
@@ -175,50 +176,7 @@
final NodeId local = clusterService.getLocalNode().id();
- clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
-
- @Override
- public void handle(final ClusterMessage message) {
- FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
- log.info("received batch request {}", operation);
-
- final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
- if (!local.equals(replicaInfo.master().orNull())) {
-
- Set<FlowRule> failures = new HashSet<>(operation.size());
- for (FlowRuleBatchEntry op : operation.getOperations()) {
- failures.add(op.getTarget());
- }
- CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
- // This node is no longer the master, respond as all failed.
- // TODO: we might want to wrap response in envelope
- // to distinguish sw programming failure and hand over
- // it make sense in the latter case to retry immediately.
- try {
- message.respond(SERIALIZER.encode(allFailed));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
- return;
- }
-
- final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
-
- f.addListener(new Runnable() {
-
- @Override
- public void run() {
- CompletedBatchOperation result = Futures.getUnchecked(f);
- try {
- message.respond(SERIALIZER.encode(result));
- } catch (IOException e) {
- log.error("Failed to respond back", e);
- }
- }
- }, futureListeners);
- }
- });
+ clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
@@ -400,12 +358,7 @@
try {
ListenableFuture<byte[]> responseFuture =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
- return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
- @Override
- public CompletedBatchOperation apply(byte[] input) {
- return SERIALIZER.decode(input);
- }
- });
+ return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
@@ -583,6 +536,56 @@
log.debug("removedFromPrimary {}", removed);
}
+ private final class OnStoreBatch implements ClusterMessageHandler {
+ private final NodeId local;
+
+ private OnStoreBatch(NodeId local) {
+ this.local = local;
+ }
+
+ @Override
+ public void handle(final ClusterMessage message) {
+ FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
+ log.info("received batch request {}", operation);
+
+ final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+ ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (!local.equals(replicaInfo.master().orNull())) {
+
+ Set<FlowRule> failures = new HashSet<>(operation.size());
+ for (FlowRuleBatchEntry op : operation.getOperations()) {
+ failures.add(op.getTarget());
+ }
+ CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+ // This node is no longer the master, respond as all failed.
+ // TODO: we might want to wrap response in envelope
+ // to distinguish sw programming failure and hand over
+ // it make sense in the latter case to retry immediately.
+ try {
+ message.respond(SERIALIZER.encode(allFailed));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ return;
+ }
+
+ final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
+
+ f.addListener(new Runnable() {
+
+ @Override
+ public void run() {
+ CompletedBatchOperation result = Futures.getUnchecked(f);
+ try {
+ message.respond(SERIALIZER.encode(result));
+ } catch (IOException e) {
+ log.error("Failed to respond back", e);
+ }
+ }
+ }, futureListeners);
+ }
+ }
+
private final class SMapLoader
extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DecodeTo.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DecodeTo.java
new file mode 100644
index 0000000..a8b0379
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DecodeTo.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.serializers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+
+/**
+ * Function to convert byte[] into {@code T}.
+ *
+ * @param <T> Type after decoding
+ */
+public final class DecodeTo<T> implements Function<byte[], T> {
+
+ private StoreSerializer serializer;
+
+ public DecodeTo(StoreSerializer serializer) {
+ this.serializer = checkNotNull(serializer);
+ }
+
+ @Override
+ public T apply(byte[] input) {
+ return serializer.decode(input);
+ }
+}
\ No newline at end of file