convert anonymous class to nested class

Change-Id: I2d0770b80ca4806fabf31fd358ecb165d3e9f778
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 7a917a7..bd2742a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -74,12 +74,13 @@
 import org.onlab.onos.store.flow.ReplicaInfoService;
 import org.onlab.onos.store.hz.AbstractHazelcastStore;
 import org.onlab.onos.store.hz.SMap;
+import org.onlab.onos.store.serializers.DecodeTo;
 import org.onlab.onos.store.serializers.DistributedStoreSerializers;
 import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
-import com.google.common.base.Function;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -146,7 +147,7 @@
     // TODO make this configurable
     private boolean syncBackup = false;
 
-    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+    protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
             serializerPool = KryoNamespace.newBuilder()
@@ -175,50 +176,7 @@
 
         final NodeId local = clusterService.getLocalNode().id();
 
-        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
-
-            @Override
-            public void handle(final ClusterMessage message) {
-                FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
-                log.info("received batch request {}", operation);
-
-                final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
-                ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
-                if (!local.equals(replicaInfo.master().orNull())) {
-
-                    Set<FlowRule> failures = new HashSet<>(operation.size());
-                    for (FlowRuleBatchEntry op : operation.getOperations()) {
-                        failures.add(op.getTarget());
-                    }
-                    CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
-                    // This node is no longer the master, respond as all failed.
-                    // TODO: we might want to wrap response in envelope
-                    // to distinguish sw programming failure and hand over
-                    // it make sense in the latter case to retry immediately.
-                    try {
-                        message.respond(SERIALIZER.encode(allFailed));
-                    } catch (IOException e) {
-                        log.error("Failed to respond back", e);
-                    }
-                    return;
-                }
-
-                final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
-
-                f.addListener(new Runnable() {
-
-                    @Override
-                    public void run() {
-                         CompletedBatchOperation result = Futures.getUnchecked(f);
-                        try {
-                            message.respond(SERIALIZER.encode(result));
-                        } catch (IOException e) {
-                            log.error("Failed to respond back", e);
-                        }
-                    }
-                }, futureListeners);
-            }
-        });
+        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
 
         clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
 
@@ -400,12 +358,7 @@
         try {
             ListenableFuture<byte[]> responseFuture =
                     clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
-                @Override
-                public CompletedBatchOperation apply(byte[] input) {
-                    return SERIALIZER.decode(input);
-                }
-            });
+            return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
         } catch (IOException e) {
             return Futures.immediateFailedFuture(e);
         }
@@ -583,6 +536,56 @@
         log.debug("removedFromPrimary {}", removed);
     }
 
+    private final class OnStoreBatch implements ClusterMessageHandler {
+        private final NodeId local;
+
+        private OnStoreBatch(NodeId local) {
+            this.local = local;
+        }
+
+        @Override
+        public void handle(final ClusterMessage message) {
+            FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
+            log.info("received batch request {}", operation);
+
+            final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
+            ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+            if (!local.equals(replicaInfo.master().orNull())) {
+
+                Set<FlowRule> failures = new HashSet<>(operation.size());
+                for (FlowRuleBatchEntry op : operation.getOperations()) {
+                    failures.add(op.getTarget());
+                }
+                CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
+                // This node is no longer the master, respond as all failed.
+                // TODO: we might want to wrap response in envelope
+                // to distinguish sw programming failure and hand over
+                // it make sense in the latter case to retry immediately.
+                try {
+                    message.respond(SERIALIZER.encode(allFailed));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+                return;
+            }
+
+            final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
+
+            f.addListener(new Runnable() {
+
+                @Override
+                public void run() {
+                     CompletedBatchOperation result = Futures.getUnchecked(f);
+                    try {
+                        message.respond(SERIALIZER.encode(result));
+                    } catch (IOException e) {
+                        log.error("Failed to respond back", e);
+                    }
+                }
+            }, futureListeners);
+        }
+    }
+
     private final class SMapLoader
         extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
 
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DecodeTo.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DecodeTo.java
new file mode 100644
index 0000000..a8b0379
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DecodeTo.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.serializers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+
+/**
+ * Function to convert byte[] into {@code T}.
+ *
+ * @param <T> Type after decoding
+ */
+public final class DecodeTo<T> implements Function<byte[], T> {
+
+    private StoreSerializer serializer;
+
+    public DecodeTo(StoreSerializer serializer) {
+        this.serializer = checkNotNull(serializer);
+    }
+
+    @Override
+    public T apply(byte[] input) {
+        return serializer.decode(input);
+    }
+}
\ No newline at end of file