Optimize filtering flow rule counts by state

Change-Id: I1e1d80b8e618743ee449162a081cf42e71c99abf
diff --git a/cli/src/main/java/org/onosproject/cli/net/FlowsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/FlowsListCommand.java
index 098c9f0..9717a5c 100644
--- a/cli/src/main/java/org/onosproject/cli/net/FlowsListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/FlowsListCommand.java
@@ -126,12 +126,15 @@
         compilePredicate();
 
         if (countOnly && !suppressCoreOutput && filter.isEmpty() && remove == null) {
-            if (uri == null) {
+            if (state == null && uri == null) {
                 deviceService.getDevices().forEach(device -> printCount(device, service));
+            } else if (uri == null) {
+                deviceService.getDevices()
+                        .forEach(device -> printCount(device, FlowEntryState.valueOf(state.toUpperCase()), service));
             } else {
                 Device device = deviceService.getDevice(DeviceId.deviceId(uri));
                 if (device != null) {
-                    printCount(device, service);
+                    printCount(device, FlowEntryState.valueOf(state.toUpperCase()), service);
                 }
             }
             return;
@@ -300,6 +303,10 @@
         print("deviceId=%s, flowRuleCount=%d", device.id(), flowRuleService.getFlowRuleCount(device.id()));
     }
 
+    private void printCount(Device device, FlowEntryState state, FlowRuleService flowRuleService) {
+        print("deviceId=%s, flowRuleCount=%d", device.id(), flowRuleService.getFlowRuleCount(device.id(), state));
+    }
+
     /**
      * Prints flows.
      *
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
index af9e57f..78aef84 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
@@ -53,6 +53,17 @@
     }
 
     /**
+     * Returns the number of flow rules in the given state for the given device.
+     *
+     * @param deviceId the device identifier
+     * @param state the state for which to count flow rules
+     * @return number of flow rules in the given state for the given device
+     */
+    default int getFlowRuleCount(DeviceId deviceId, FlowEntry.FlowEntryState state) {
+        return 0;
+    }
+
+    /**
      * Returns the collection of flow entries applied on the specified device.
      * This will include flow rules which may not yet have been applied to
      * the device.
@@ -162,7 +173,9 @@
      *
      * @param deviceId device identifier
      * @return number of flow rules in ADDED state
+     * @deprecated since 2.1
      */
+    @Deprecated
     default long getActiveFlowRuleCount(DeviceId deviceId) {
         return 0;
     }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
index 449a1fd..07493a8 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
@@ -45,6 +45,17 @@
     }
 
     /**
+     * Returns the number of flow rules in the given state for the given device.
+     *
+     * @param deviceId the device identifier
+     * @param state the state for which to count flow rules
+     * @return number of flow rules in the given state for the given device
+     */
+    default int getFlowRuleCount(DeviceId deviceId, FlowEntry.FlowEntryState state) {
+        return 0;
+    }
+
+    /**
      * Returns the stored flow.
      *
      * @param rule the rule to look for
@@ -154,6 +165,8 @@
      *
      * @param deviceId the device ID
      * @return number of flow rules in ADDED state
+     * @deprecated since 2.1
      */
+    @Deprecated
     long getActiveFlowRuleCount(DeviceId deviceId);
 }
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 99ee528..d9bec9e 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -258,6 +258,13 @@
     }
 
     @Override
+    public int getFlowRuleCount(DeviceId deviceId, FlowEntry.FlowEntryState state) {
+        checkPermission(FLOWRULE_READ);
+        checkNotNull(deviceId, "Device ID cannot be null");
+        return store.getFlowRuleCount(deviceId, state);
+    }
+
+    @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
         checkPermission(FLOWRULE_READ);
         checkNotNull(deviceId, DEVICE_ID_NULL);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index d21984a..4294f6e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -18,6 +18,8 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Streams;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.OrderedExecutor;
 import org.onlab.util.Tools;
@@ -194,6 +196,7 @@
         .register(KryoNamespaces.API)
         .register(BucketId.class)
         .register(FlowBucket.class)
+        .register(ImmutablePair.class)
         .build());
 
     protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
@@ -322,8 +325,11 @@
             GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
         clusterCommunicator.addSubscriber(
             GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
-        clusterCommunicator.addSubscriber(
-            GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
+        clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
+            GET_DEVICE_FLOW_COUNT,
+            serializer::decode,
+            p -> flowTable.getFlowRuleCount(p.getLeft(), p.getRight()),
+            serializer::encode, executor);
         clusterCommunicator.addSubscriber(
             REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
     }
@@ -352,6 +358,11 @@
 
     @Override
     public int getFlowRuleCount(DeviceId deviceId) {
+        return getFlowRuleCount(deviceId, null);
+    }
+
+    @Override
+    public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
         NodeId master = mastershipService.getMasterFor(deviceId);
         if (master == null) {
             log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
@@ -359,19 +370,19 @@
         }
 
         if (Objects.equals(local, master)) {
-            return flowTable.getFlowRuleCount(deviceId);
+            return flowTable.getFlowRuleCount(deviceId, state);
         }
 
         log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
         return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
-            deviceId,
-            GET_DEVICE_FLOW_COUNT,
-            serializer::encode,
-            serializer::decode,
-            master),
-            FLOW_RULE_STORE_TIMEOUT_MILLIS,
-            TimeUnit.MILLISECONDS,
-            0);
+                Pair.of(deviceId, state),
+                GET_DEVICE_FLOW_COUNT,
+                serializer::encode,
+                serializer::decode,
+                master),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                0);
     }
 
     @Override
@@ -755,6 +766,23 @@
         }
 
         /**
+         * Returns the count of flow rules in the given state for the given device.
+         *
+         * @param deviceId the device for which to return the flow rule count
+         * @return the flow rule count for the given device
+         */
+        public int getFlowRuleCount(DeviceId deviceId, FlowEntryState state) {
+            if (state == null) {
+                return getFlowRuleCount(deviceId);
+            }
+            return (int) getFlowTable(deviceId)
+                .getFlowEntries()
+                .stream()
+                .filter(rule -> rule.state() == state)
+                .count();
+        }
+
+        /**
          * Returns the flow entry for the given rule.
          *
          * @param rule the rule for which to return the flow entry
@@ -997,4 +1025,14 @@
             mastershipTermLifecycles.removeListener(this);
         }
     }
+
+    private static class CountMessage {
+        private final DeviceId deviceId;
+        private final FlowEntryState state;
+
+        CountMessage(DeviceId deviceId, FlowEntryState state) {
+            this.deviceId = deviceId;
+            this.state = state;
+        }
+    }
 }
\ No newline at end of file
diff --git a/web/api/src/main/java/org/onosproject/rest/resources/StatisticsWebResource.java b/web/api/src/main/java/org/onosproject/rest/resources/StatisticsWebResource.java
index 9667182..c38d5f8 100644
--- a/web/api/src/main/java/org/onosproject/rest/resources/StatisticsWebResource.java
+++ b/web/api/src/main/java/org/onosproject/rest/resources/StatisticsWebResource.java
@@ -38,6 +38,7 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TableStatisticsEntry;
 import org.onosproject.net.link.LinkService;
@@ -346,7 +347,7 @@
         final ObjectNode root = mapper().createObjectNode();
         final ArrayNode rootArrayNode = root.putArray("statistics");
         for (final Device device : devices) {
-            long activeEntries = service.getActiveFlowRuleCount(device.id());
+            int activeEntries = service.getFlowRuleCount(device.id(), FlowEntry.FlowEntryState.ADDED);
             final ObjectNode entry = mapper().createObjectNode();
             entry.put("device", device.id().toString());
             entry.put("activeEntries", activeEntries);