remote stats service functional

Change-Id: I4ebc4c565b7ee7533b5bd1a0379f54470223ba0e
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
index 03ae01b..273e3cc 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/statistic/impl/DistributedStatisticStore.java
@@ -3,8 +3,7 @@
 import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
-import com.google.common.collect.ImmutableSet;
-
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -130,7 +129,7 @@
     }
 
     @Override
-    public void removeFromStatistics(FlowRule rule) {
+    public synchronized void removeFromStatistics(FlowRule rule) {
         ConnectPoint cp = buildConnectPoint(rule);
         if (cp == null) {
             return;
@@ -139,6 +138,15 @@
         if (rep != null) {
             rep.remove(rule);
         }
+        Set<FlowEntry> values = current.get(cp);
+        if (values != null) {
+            values.remove(rule);
+        }
+        values = previous.get(cp);
+        if (values != null) {
+            values.remove(rule);
+        }
+
     }
 
     @Override
@@ -181,7 +189,7 @@
                 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
                                                       TimeUnit.MILLISECONDS));
             } catch (IOException | TimeoutException e) {
-                // FIXME: throw a FlowStoreException
+                // FIXME: throw a StatsStoreException
                 throw new RuntimeException(e);
             }
         }
@@ -200,7 +208,7 @@
         } else {
             ClusterMessage message = new ClusterMessage(
                     clusterService.getLocalNode().id(),
-                    GET_CURRENT,
+                    GET_PREVIOUS,
                     SERIALIZER.encode(connectPoint));
 
             try {
@@ -209,7 +217,7 @@
                 return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
                                                       TimeUnit.MILLISECONDS));
             } catch (IOException | TimeoutException e) {
-                // FIXME: throw a FlowStoreException
+                // FIXME: throw a StatsStoreException
                 throw new RuntimeException(e);
             }
         }
@@ -283,7 +291,7 @@
 
         public synchronized Set<FlowEntry> get() {
             counter.set(rules.size());
-            return ImmutableSet.copyOf(rules);
+            return Sets.newHashSet(rules);
         }
 
 
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index dc0eaa8..9b75cea 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -31,6 +31,7 @@
 import org.onlab.onos.net.flow.DefaultFlowRule;
 import org.onlab.onos.net.flow.DefaultTrafficSelector;
 import org.onlab.onos.net.flow.DefaultTrafficTreatment;
+import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowId;
 import org.onlab.onos.net.flow.criteria.Criteria;
 import org.onlab.onos.net.flow.criteria.Criterion;
@@ -98,6 +99,7 @@
                     DefaultHostDescription.class,
                     DefaultFlowRule.class,
                     DefaultFlowEntry.class,
+                    FlowEntry.FlowEntryState.class,
                     FlowId.class,
                     DefaultTrafficSelector.class,
                     Criteria.PortCriterion.class,