CORD-13:Table Statistics support along with CLI and REST

Change-Id: Ic7facc73754c4b1e7c9c5a9eecd217f89a67e135
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index de7a3ac..8cd63e7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.flow.impl;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
@@ -57,6 +58,7 @@
 import org.onosproject.net.flow.FlowRuleStore;
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.TableStatisticsEntry;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterMessage;
@@ -64,9 +66,16 @@
 import org.onosproject.store.flow.ReplicaInfoEvent;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.impl.MastershipBasedTimestamp;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.StoreSerializer;
 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
@@ -151,6 +160,13 @@
     private final ScheduledExecutorService backupSenderExecutor =
             Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
 
+    private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
+    private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
+            new InternalTableStatsListener();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
     protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
@@ -161,6 +177,11 @@
         }
     };
 
+    protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(MastershipBasedTimestamp.class);
+
+
     private IdGenerator idGenerator;
     private NodeId local;
 
@@ -186,6 +207,15 @@
                     TimeUnit.MILLISECONDS);
         }
 
+        deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
+                .withName("onos-flow-table-stats")
+                .withSerializer(SERIALIZER_BUILDER)
+                .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .withTombstonesDisabled()
+                .build();
+        deviceTableStats.addListener(tableStatsListener);
+
         logConfig("Started");
     }
 
@@ -197,6 +227,8 @@
         }
         configService.unregisterProperties(getClass(), false);
         unregisterMessageHandlers();
+        deviceTableStats.removeListener(tableStatsListener);
+        deviceTableStats.destroy();
         messageHandlingExecutor.shutdownNow();
         backupSenderExecutor.shutdownNow();
         log.info("Stopped");
@@ -786,4 +818,36 @@
             return backedupDevices;
         }
     }
+
+    @Override
+    public FlowRuleEvent updateTableStatistics(DeviceId deviceId,
+                                               List<TableStatisticsEntry> tableStats) {
+        deviceTableStats.put(deviceId, tableStats);
+        return null;
+    }
+
+    @Override
+    public Iterable<TableStatisticsEntry> getTableStatistics(DeviceId deviceId) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+
+        if (master == null) {
+            log.debug("Failed to getTableStats: No master for {}", deviceId);
+            return Collections.emptyList();
+        }
+
+        List<TableStatisticsEntry> tableStats = deviceTableStats.get(deviceId);
+        if (tableStats == null) {
+            return Collections.emptyList();
+        }
+        return ImmutableList.copyOf(tableStats);
+    }
+
+    private class InternalTableStatsListener
+        implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<DeviceId,
+                          List<TableStatisticsEntry>> event) {
+            //TODO: Generate an event to listeners (do we need?)
+        }
+    }
 }