[Emu] openTAM: FlowStatisticManager, DistributedFlowStatisticStore, get-flow-stats CLI Implementation and NewAdaptiveFlowStatsCollector update and typo

 - GetFlowStatistics.java
   .Fixed function name typo: immediateLoad()
 - SummaryFlowEntryWithLoad.java
   .Added javadoc
 - TypedFlowEntryWithLoad.java
   .Added javadoc,
   .and replace checknotnull and throw NullPointerException in typedPollInterval() at line 104

Change-Id: I23d2eaf234d0affeb5f927275148d9165c66c774
diff --git a/cli/src/main/java/org/onosproject/cli/Comparators.java b/cli/src/main/java/org/onosproject/cli/Comparators.java
index b0cbbdd..1df2f04 100644
--- a/cli/src/main/java/org/onosproject/cli/Comparators.java
+++ b/cli/src/main/java/org/onosproject/cli/Comparators.java
@@ -25,6 +25,8 @@
 import org.onosproject.net.Port;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.group.Group;
+
+import org.onosproject.net.statistic.TypedFlowEntryWithLoad;
 import org.onosproject.net.topology.TopologyCluster;
 
 import java.util.Comparator;
@@ -115,4 +117,12 @@
     public static final Comparator<Interface> INTERFACES_COMPARATOR = (intf1, intf2) ->
             CONNECT_POINT_COMPARATOR.compare(intf1.connectPoint(), intf2.connectPoint());
 
+    public static final Comparator<TypedFlowEntryWithLoad> TYPEFLOWENTRY_WITHLOAD_COMPARATOR =
+            new Comparator<TypedFlowEntryWithLoad>() {
+                @Override
+                public int compare(TypedFlowEntryWithLoad fe1, TypedFlowEntryWithLoad fe2) {
+                    long delta = fe1.load().rate() -  fe2.load().rate();
+                    return delta == 0 ? 0 : (delta > 0 ? -1 : +1);
+                }
+            };
 }
diff --git a/cli/src/main/java/org/onosproject/cli/net/GetFlowStatistics.java b/cli/src/main/java/org/onosproject/cli/net/GetFlowStatistics.java
new file mode 100644
index 0000000..cafe87f
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/net/GetFlowStatistics.java
@@ -0,0 +1,323 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.cli.net;

+

+import org.apache.karaf.shell.commands.Argument;

+import org.apache.karaf.shell.commands.Command;

+import org.apache.karaf.shell.commands.Option;

+import org.onosproject.cli.AbstractShellCommand;

+import org.onosproject.net.ConnectPoint;

+import org.onosproject.net.Device;

+import org.onosproject.net.DeviceId;

+import org.onosproject.net.Port;

+import org.onosproject.net.PortNumber;

+import org.onosproject.net.device.DeviceService;

+import org.onosproject.net.flow.TypedStoredFlowEntry;

+import org.onosproject.net.flow.instructions.Instruction;

+import org.onosproject.net.statistic.FlowStatisticService;

+import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;

+import org.onosproject.net.statistic.TypedFlowEntryWithLoad;

+

+import java.util.List;

+import java.util.Map;

+

+import static org.onosproject.net.DeviceId.deviceId;

+import static org.onosproject.net.PortNumber.portNumber;

+

+/**

+ * Fetches flow statistics with a flow type and instruction type.

+ */

+@Command(scope = "onos", name = "get-flow-stats",

+        description = "Fetches flow stats for a connection point with given flow type and instruction type")

+public class GetFlowStatistics extends AbstractShellCommand {

+    @Argument(index = 0, name = "devicePort",

+            description = "Device[/Port] connectPoint Description",

+            required = true, multiValued = false)

+    String devicePort = null;

+

+    @Option(name = "-s", aliases = "--summary",

+            description = "Show flow stats summary",

+            required = false, multiValued = false)

+    boolean showSummary = true; // default summary

+

+    @Option(name = "-a", aliases = "--all",

+            description = "Show flow stats all",

+            required = false, multiValued = false)

+    boolean showAll = false;

+

+    @Option(name = "-t", aliases = "--topn",

+            description = "Show flow stats topn",

+            required = false, multiValued = false)

+    String showTopn = null;

+

+    @Option(name = "-f", aliases = "--flowType",

+            description = "Flow live type, It includes IMMEDIATE, SHORT, MID, LONG, UNKNOWN"

+                          + ", and is valid with -a or -t option only",

+            required = false, multiValued = false)

+    String flowLiveType = null;

+

+    @Option(name = "-i", aliases = "--instructionType",

+            description = "Flow instruction type, It includes DROP, OUTPUT, GROUP, L0MODIFICATION, L2MODIFICATION,"

+                    + " TABLE, L3MODIFICATION, METADATA"

+                    + ", and is valid with -a or -t option only",

+            required = false, multiValued = false)

+    String instructionType = null;

+

+    @Override

+    protected void execute() {

+        DeviceService deviceService = get(DeviceService.class);

+        FlowStatisticService flowStatsService = get(FlowStatisticService.class);

+

+        String deviceURI = getDeviceId(devicePort);

+        String portURI = getPortNumber(devicePort);

+

+        DeviceId ingressDeviceId = deviceId(deviceURI);

+        PortNumber ingressPortNumber;

+        if (portURI.length() == 0) {

+            ingressPortNumber = null;

+        } else {

+            ingressPortNumber = portNumber(portURI);

+        }

+

+        Device device = deviceService.getDevice(ingressDeviceId);

+        if (device == null) {

+            error("No such device %s", ingressDeviceId.uri());

+            return;

+        }

+

+        if (ingressPortNumber != null) {

+            Port port = deviceService.getPort(ingressDeviceId, ingressPortNumber);

+            if (port == null) {

+                error("No such port %s on device %s", portURI, ingressDeviceId.uri());

+                return;

+            }

+        }

+

+        if (flowLiveType != null) {

+            flowLiveType = flowLiveType.toUpperCase();

+        }

+        if (instructionType != null) {

+            instructionType = instructionType.toUpperCase();

+        }

+

+        // convert String to FlowLiveType and check validity

+        TypedStoredFlowEntry.FlowLiveType inLiveType;

+        if (flowLiveType == null) {

+            inLiveType = null;

+        } else {

+            inLiveType = getFlowLiveType(flowLiveType);

+            if (inLiveType == null) {

+                error("Invalid flow live type [%s] error", flowLiveType);

+                return;

+            }

+        }

+        // convert String to InstructionType and check validity

+        Instruction.Type inInstructionType;

+        if (instructionType == null) {

+            inInstructionType = null;

+        } else {

+            inInstructionType = getInstructionType(instructionType);

+            if (inInstructionType == null) {

+                error("Invalid instruction type [%s] error", instructionType);

+                return;

+            }

+        }

+

+        if (showTopn != null) {

+            int topn = Integer.parseInt(showTopn);

+

+            if (topn <= 0) {

+                topn = 100; //default value

+            } else if (topn > 1000) {

+                topn = 1000; //max value

+            }

+

+            // print show topn head line with type

+            print("deviceId=%s, show TOPN=%s flows, live type=%s, instruction type=%s",

+                    deviceURI,

+                    Integer.toString(topn),

+                    flowLiveType == null ? "ALL" : flowLiveType,

+                    instructionType == null ? "ALL" : instructionType);

+            if (ingressPortNumber == null) {

+                Map<ConnectPoint, List<TypedFlowEntryWithLoad>> typedFlowLoadMap =

+                          flowStatsService.loadTopnByType(device, inLiveType, inInstructionType, topn);

+                // print all ports topn flows load for a given device

+                for (ConnectPoint cp : typedFlowLoadMap.keySet()) {

+                    printPortFlowsLoad(cp, typedFlowLoadMap.get(cp));

+                }

+            } else {

+                List<TypedFlowEntryWithLoad> typedFlowLoad =

+                        flowStatsService.loadTopnByType(device, ingressPortNumber, inLiveType, inInstructionType, topn);

+                // print device/port topn flows load

+                ConnectPoint cp = new ConnectPoint(ingressDeviceId, ingressPortNumber);

+                printPortFlowsLoad(cp, typedFlowLoad);

+            }

+        } else if (showAll) { // is true?

+            // print show all head line with type

+            print("deviceId=%s, show ALL flows, live type=%s, instruction type=%s",

+                    deviceURI,

+                    flowLiveType == null ? "ALL" : flowLiveType,

+                    instructionType == null ? "ALL" : instructionType);

+            if (ingressPortNumber == null) {

+                Map<ConnectPoint, List<TypedFlowEntryWithLoad>> typedFlowLoadMap =

+                        flowStatsService.loadAllByType(device, inLiveType, inInstructionType);

+                // print all ports all flows load for a given device

+                for (ConnectPoint cp : typedFlowLoadMap.keySet()) {

+                    printPortFlowsLoad(cp, typedFlowLoadMap.get(cp));

+                }

+            } else {

+                List<TypedFlowEntryWithLoad> typedFlowLoad =

+                        flowStatsService.loadAllByType(device, ingressPortNumber, inLiveType, inInstructionType);

+                // print device/port all flows load

+                ConnectPoint cp = new ConnectPoint(ingressDeviceId, ingressPortNumber);

+                printPortFlowsLoad(cp, typedFlowLoad);

+            }

+        } else { // if (showSummary == true) //always is true

+            // print show summary head line

+            print("deviceId=%s, show SUMMARY flows", deviceURI);

+            if (ingressPortNumber == null) {

+                Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryFlowLoadMap =

+                        flowStatsService.loadSummary(device);

+                // print all ports flow load summary for a given device

+                for (ConnectPoint cp : summaryFlowLoadMap.keySet()) {

+                    printPortSummaryLoad(cp, summaryFlowLoadMap.get(cp));

+                }

+            } else {

+                SummaryFlowEntryWithLoad summaryFlowLoad =

+                        flowStatsService.loadSummary(device, ingressPortNumber);

+                // print device/port flow load summary

+                ConnectPoint cp = new ConnectPoint(ingressDeviceId, ingressPortNumber);

+                printPortSummaryLoad(cp, summaryFlowLoad);

+            }

+        }

+    }

+

+    /**

+     * Extracts the port number portion of the ConnectPoint.

+     *

+     * @param deviceString string representing the device/port

+     * @return port number as a string, empty string if the port is not found

+     */

+    private String getPortNumber(String deviceString) {

+        if (deviceString == null) {

+            return "";

+        }

+

+        int slash = deviceString.indexOf('/');

+        if (slash <= 0) {

+            return ""; // return when no port number

+        }

+        return deviceString.substring(slash + 1, deviceString.length());

+    }

+

+    /**

+     * Extracts the device ID portion of the ConnectPoint.

+     *

+     * @param deviceString string representing the device/port

+     * @return device ID string

+     */

+    private String getDeviceId(String deviceString) {

+        if (deviceString == null) {

+            return "";

+        }

+

+        int slash = deviceString.indexOf('/');

+        if (slash <= 0) {

+            return deviceString; // return only included device ID

+        }

+        return deviceString.substring(0, slash);

+    }

+

+    /**

+     * converts string of flow live type to FloeLiveType enum.

+     *

+     * @param liveType string representing the flow live type

+     * @return TypedStoredFlowEntry.FlowLiveType

+     */

+    private TypedStoredFlowEntry.FlowLiveType getFlowLiveType(String liveType) {

+        String liveTypeUC = liveType.toUpperCase();

+

+        if (liveTypeUC.equals("IMMEDIATE")) {

+            return TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW;

+        } else if (liveTypeUC.equals("SHORT")) {

+            return TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW;

+        } else if (liveTypeUC.equals("MID")) {

+            return TypedStoredFlowEntry.FlowLiveType.MID_FLOW;

+        } else if (liveTypeUC.equals("LONG")) {

+            return TypedStoredFlowEntry.FlowLiveType.LONG_FLOW;

+        } else if (liveTypeUC.equals("UNKNOWN")) {

+            return TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW;

+        } else {

+            return null; // flow live type error

+        }

+    }

+

+    /**

+     * converts string of instruction type to Instruction type enum.

+     *

+     * @param instType string representing the instruction type

+     * @return Instruction.Type

+     */

+    private Instruction.Type getInstructionType(String instType) {

+        String instTypeUC = instType.toUpperCase();

+

+        if (instTypeUC.equals("DROP")) {

+            return Instruction.Type.DROP;

+        } else if (instTypeUC.equals("OUTPUT")) {

+            return Instruction.Type.OUTPUT;

+        } else if (instTypeUC.equals("GROUP")) {

+            return Instruction.Type.GROUP;

+        } else if (instTypeUC.equals("L0MODIFICATION")) {

+            return Instruction.Type.L0MODIFICATION;

+        } else if (instTypeUC.equals("L2MODIFICATION")) {

+            return Instruction.Type.L2MODIFICATION;

+        } else if (instTypeUC.equals("TABLE")) {

+            return Instruction.Type.TABLE;

+        } else if (instTypeUC.equals("L3MODIFICATION")) {

+            return Instruction.Type.L3MODIFICATION;

+        } else if (instTypeUC.equals("METADATA")) {

+            return Instruction.Type.METADATA;

+        } else {

+             return null; // instruction type error

+        }

+    }

+

+    private void printPortFlowsLoad(ConnectPoint cp, List<TypedFlowEntryWithLoad> typedFlowLoad) {

+       print("  deviceId/Port=%s/%s, %s flows", cp.elementId(), cp.port(), typedFlowLoad.size());

+        for (TypedFlowEntryWithLoad tfel: typedFlowLoad) {

+            TypedStoredFlowEntry tfe =  tfel.typedStoredFlowEntry();

+            print("    flowId=%s, state=%s, liveType=%s, life=%s -> %s",

+                    Long.toHexString(tfe.id().value()),

+                    tfe.state(),

+                    tfe.flowLiveType(),

+                    tfe.life(),

+                    tfel.load().isValid() ? tfel.load() : "Load{rate=0, NOT VALID}");

+        }

+    }

+

+    private void printPortSummaryLoad(ConnectPoint cp, SummaryFlowEntryWithLoad summaryFlowLoad) {

+        print("  deviceId/Port=%s/%s, Total=%s, Immediate=%s, Short=%s, Mid=%s, Long=%s, Unknown=%s",

+                cp.elementId(),

+                cp.port(),

+                summaryFlowLoad.totalLoad().isValid() ? summaryFlowLoad.totalLoad() : "Load{rate=0, NOT VALID}",

+                summaryFlowLoad.immediateLoad().isValid() ? summaryFlowLoad.immediateLoad() : "Load{rate=0, NOT VALID}",

+                summaryFlowLoad.shortLoad().isValid() ? summaryFlowLoad.shortLoad() : "Load{rate=0, NOT VALID}",

+                summaryFlowLoad.midLoad().isValid() ? summaryFlowLoad.midLoad() : "Load{rate=0, NOT VALID}",

+                summaryFlowLoad.longLoad().isValid() ? summaryFlowLoad.longLoad() : "Load{rate=0, NOT VALID}",

+                summaryFlowLoad.unknownLoad().isValid() ? summaryFlowLoad.unknownLoad() : "Load{rate=0, NOT VALID}");

+    }

+}

diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 459ffa9..8c56a49 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -222,6 +222,12 @@
             </completers>
         </command>
         <command>
+            <action class="org.onosproject.cli.net.GetFlowStatistics"/>
+            <completers>
+                <ref component-id="deviceIdCompleter"/>
+            </completers>
+        </command>
+        <command>
             <action class="org.onosproject.cli.net.AddMultiPointToSinglePointIntentCommand"/>
             <completers>
                 <ref component-id="connectPointCompleter"/>
@@ -333,7 +339,6 @@
         <command>
             <action class="org.onosproject.cli.net.InterfacesListCommand"/>
         </command>
-
         <command>
             <action class="org.onosproject.cli.net.GroupsListCommand"/>
         </command>
diff --git a/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticService.java b/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticService.java
new file mode 100644
index 0000000..f59670b
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticService.java
@@ -0,0 +1,105 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.net.statistic;

+

+import org.onosproject.net.ConnectPoint;

+import org.onosproject.net.Device;

+import org.onosproject.net.PortNumber;

+import org.onosproject.net.flow.TypedStoredFlowEntry;

+import org.onosproject.net.flow.instructions.Instruction;

+

+import java.util.List;

+import java.util.Map;

+

+/**

+ * Service for obtaining individual flow statistic information about device and link in the system.

+ * Basic statistics are obtained from the StatisticService

+ */

+public interface FlowStatisticService {

+

+    /**

+     * Obtain the summary load list for the device with the given link.

+     *

+     * @param device the Device  to query.

+     * @return map of summary flow entry load

+     */

+    Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device);

+

+    /**

+     * Obtain the summary load for the device with the given link or port.

+     *

+     * @param device the Device to query.

+     * @param pNumber the port number to query.

+     * @return summary flow entry load

+     */

+    SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber);

+

+    /**

+     * Obtain the set of the flow type and load list for the device with the given link.

+     *

+     * @param device the Device  to query.

+     * @param liveType the FlowLiveType  to filter, null means no filtering .

+     * @param instType the InstructionType to filter, null means no filtering.

+     * @return map of flow entry load

+     */

+    Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,

+                                                                  TypedStoredFlowEntry.FlowLiveType liveType,

+                                                                  Instruction.Type instType);

+

+    /**

+     * Obtain the flow type and load list for the device with the given link or port.

+     *

+     * @param device the Device to query.

+     * @param pNumber the port number of the Device to query

+     * @param liveType the FlowLiveType  to filter, null means no filtering .

+     * @param instType the InstructionType to filter, null means no filtering.

+     * @return list of flow entry load

+     */

+    List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,

+                                               TypedStoredFlowEntry.FlowLiveType liveType,

+                                               Instruction.Type instType);

+

+    /**

+     * Obtain the set of the flow type and load topn list for the device with the given link.

+     *

+     * @param device the Device  to query.

+     * @param liveType the FlowLiveType  to filter, null means no filtering .

+     * @param instType the InstructionType to filter, null means no filtering.

+     * @param topn the top number to filter, null means no filtering.

+     * @return map of flow entry load

+     */

+    Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,

+                                                                   TypedStoredFlowEntry.FlowLiveType liveType,

+                                                                   Instruction.Type instType,

+                                                                   int topn);

+

+    /**

+     * Obtain the flow type and load topn list for the device with the given link or port.

+     *

+     * @param device the Device  to query.

+     * @param pNumber the port number of the Device to query

+     * @param liveType the FlowLiveType  to filter, null means no filtering .

+     * @param instType the InstructionType to filter, null means no filtering.

+     * @return list of flow entry load

+     */

+    List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,

+                                                TypedStoredFlowEntry.FlowLiveType liveType,

+                                                Instruction.Type instType,

+                                                int topn);

+}

+

+

diff --git a/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticStore.java b/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticStore.java
new file mode 100644
index 0000000..3c2aa89
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticStore.java
@@ -0,0 +1,65 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.net.statistic;

+

+import org.onosproject.net.ConnectPoint;

+import org.onosproject.net.flow.FlowEntry;

+import org.onosproject.net.flow.FlowRule;

+

+import java.util.Set;

+

+/**

+ * Flow Store to house the computed statistics.

+ */

+public interface FlowStatisticStore {

+    /**

+     * Remove entries associated with this rule.

+     *

+     * @param rule {@link org.onosproject.net.flow.FlowRule}

+     */

+    void removeFlowStatistic(FlowRule rule);

+

+    /**

+     * Adds a flow stats observation for a flow rule. The previous flow will be removed.

+     *

+     * @param rule a {@link org.onosproject.net.flow.FlowEntry}

+     */

+    void addFlowStatistic(FlowEntry rule);

+

+    /**

+     * Updates a stats observation for a flow rule. The old flow stats will be moved to previous stats.

+     *

+     * @param rule a {@link org.onosproject.net.flow.FlowEntry}

+     */

+    void updateFlowStatistic(FlowEntry rule);

+

+    /**

+     * Fetches the current observed flow stats values.

+     *

+     * @param connectPoint the port to fetch information for

+     * @return set of current flow rules

+     */

+    Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint);

+

+    /**

+     * Fetches the current observed flow stats values.

+     *

+     * @param connectPoint the port to fetch information for

+     * @return set of current values

+     */

+    Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint);

+}

diff --git a/core/api/src/main/java/org/onosproject/net/statistic/SummaryFlowEntryWithLoad.java b/core/api/src/main/java/org/onosproject/net/statistic/SummaryFlowEntryWithLoad.java
new file mode 100644
index 0000000..60da636
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/statistic/SummaryFlowEntryWithLoad.java
@@ -0,0 +1,143 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.net.statistic;

+

+import org.onosproject.net.ConnectPoint;

+

+/**

+ * Summary Load classified by flow live type.

+ */

+public class SummaryFlowEntryWithLoad {

+    private ConnectPoint cp;

+    private Load totalLoad;

+    private Load immediateLoad;

+    private Load shortLoad;

+    private Load midLoad;

+    private Load longLoad;

+    private Load unknownLoad;

+

+    /**

+     * Creates a new summary flow entry having load for the given connect point and total load.

+     *

+     * @param cp        connect point

+     * @param totalLoad total load

+     */

+    public SummaryFlowEntryWithLoad(ConnectPoint cp, Load totalLoad) {

+        this.cp = cp;

+        this.totalLoad = totalLoad;

+        this.immediateLoad = new DefaultLoad();

+        this.shortLoad = new DefaultLoad();

+        this.midLoad = new DefaultLoad();

+        this.longLoad = new DefaultLoad();

+        this.unknownLoad = new DefaultLoad();

+    }

+

+    /**

+     * Creates a new summary flow entry having load for the given connect point

+     * and total, immediate, short, mid, and long load.

+     *

+     * @param cp        connect point

+     * @param totalLoad total load

+     * @param immediateLoad immediate load

+     * @param shortLoad short load

+     * @param midLoad mid load

+     * @param longLoad long load

+     */

+    public SummaryFlowEntryWithLoad(ConnectPoint cp,

+                                    Load totalLoad, Load immediateLoad, Load shortLoad, Load midLoad, Load longLoad) {

+        this.cp = cp;

+        this.totalLoad = totalLoad;

+        this.immediateLoad = immediateLoad;

+        this.shortLoad = shortLoad;

+        this.midLoad = midLoad;

+        this.longLoad = longLoad;

+        this.unknownLoad = new DefaultLoad();

+    }

+

+    /**

+     * Creates a new summary flow entry having load for the given connect point

+     * and total, immediate, short, mid, long, and unknown load.

+     *

+     * @param cp        connect point

+     * @param totalLoad total load

+     * @param immediateLoad immediate load

+     * @param shortLoad short load

+     * @param midLoad mid load

+     * @param longLoad long load

+     * @param unknownLoad long load

+     */

+    public SummaryFlowEntryWithLoad(ConnectPoint cp,

+                                    Load totalLoad, Load immediateLoad,

+                                    Load shortLoad, Load midLoad, Load longLoad, Load unknownLoad) {

+        this.cp = cp;

+        this.totalLoad = totalLoad;

+        this.immediateLoad = immediateLoad;

+        this.shortLoad = shortLoad;

+        this.midLoad = midLoad;

+        this.longLoad = longLoad;

+        this.unknownLoad = unknownLoad;

+    }

+

+    /**

+     * Returns connect point.

+     */

+    public ConnectPoint connectPoint() {

+        return cp;

+    }

+

+    /**

+     * Returns total load of connect point.

+     */

+    public Load totalLoad() {

+        return totalLoad;

+    }

+

+    /**

+     * Returns immediate load of connect point.

+     */

+    public Load immediateLoad() {

+        return immediateLoad;

+    }

+

+    /**

+     * Returns short load of connect point.

+     */

+    public Load shortLoad() {

+        return shortLoad;

+    }

+

+    /**

+     * Returns mid load of connect point.

+     */

+    public Load midLoad() {

+        return midLoad;

+    }

+

+    /**

+     * Returns long load of connect point.

+     */

+    public Load longLoad() {

+        return longLoad;

+    }

+

+    /**

+     * Returns unknown load of connect point.

+     */

+    public Load unknownLoad() {

+        return unknownLoad;

+    }

+}

diff --git a/core/api/src/main/java/org/onosproject/net/statistic/TypedFlowEntryWithLoad.java b/core/api/src/main/java/org/onosproject/net/statistic/TypedFlowEntryWithLoad.java
new file mode 100644
index 0000000..3e2dbdf
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/statistic/TypedFlowEntryWithLoad.java
@@ -0,0 +1,143 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.net.statistic;

+

+import org.onosproject.net.ConnectPoint;

+import org.onosproject.net.flow.FlowEntry;

+import org.onosproject.net.flow.TypedStoredFlowEntry;

+import org.onosproject.net.flow.DefaultTypedFlowEntry;

+

+import static com.google.common.base.Preconditions.checkNotNull;

+

+/**

+ * Load of flow entry of flow live type.

+ */

+public class TypedFlowEntryWithLoad {

+    private ConnectPoint cp;

+    private TypedStoredFlowEntry tfe;

+    private Load load;

+

+    //TODO: make this variables class, and share with NewAdaptivceFlowStatsCollector class

+    private static final int CAL_AND_POLL_INTERVAL = 5; // means SHORT_POLL_INTERVAL

+    private static final int MID_POLL_INTERVAL = 10;

+    private static final int LONG_POLL_INTERVAL = 15;

+

+

+    public TypedFlowEntryWithLoad(ConnectPoint cp, TypedStoredFlowEntry tfe, Load load) {

+        this.cp = cp;

+        this.tfe = tfe;

+        this.load = load;

+    }

+

+    public TypedFlowEntryWithLoad(ConnectPoint cp, TypedStoredFlowEntry tfe) {

+        this.cp = cp;

+        this.tfe = tfe;

+        this.load = new DefaultLoad(tfe.bytes(), 0, typedPollInterval(tfe));

+    }

+

+    public TypedFlowEntryWithLoad(ConnectPoint cp, FlowEntry fe) {

+        this.cp = cp;

+        this.tfe = newTypedStoredFlowEntry(fe);

+        this.load = new DefaultLoad(fe.bytes(), 0, typedPollInterval(this.tfe));

+    }

+

+    public ConnectPoint connectPoint() {

+        return cp;

+    }

+    public TypedStoredFlowEntry typedStoredFlowEntry() {

+        return tfe;

+    }

+    public Load load() {

+        return load;

+    }

+    public void setLoad(Load load) {

+        this.load = load;

+    }

+

+    /**

+     * Returns short polling interval.

+     */

+    public static int shortPollInterval() {

+        return CAL_AND_POLL_INTERVAL;

+    }

+

+    /**

+     * Returns mid polling interval.

+     */

+    public static int midPollInterval() {

+        return MID_POLL_INTERVAL;

+    }

+

+    /**

+     * Returns long polling interval.

+     */

+    public static int longPollInterval() {

+        return LONG_POLL_INTERVAL;

+    }

+

+    /**

+     * Returns average polling interval.

+     */

+    public static int avgPollInterval() {

+        return (CAL_AND_POLL_INTERVAL + MID_POLL_INTERVAL + LONG_POLL_INTERVAL) / 3;

+    }

+

+    /**

+     * Returns current typed flow entry's polling interval.

+     *

+     * @param tfe typed flow entry

+     */

+    public static long typedPollInterval(TypedStoredFlowEntry tfe) {

+        checkNotNull(tfe, "TypedStoredFlowEntry cannot be null");

+

+        switch (tfe.flowLiveType()) {

+            case LONG_FLOW:

+                return LONG_POLL_INTERVAL;

+            case MID_FLOW:

+                return MID_POLL_INTERVAL;

+            case SHORT_FLOW:

+            case IMMEDIATE_FLOW:

+            default:

+                return CAL_AND_POLL_INTERVAL;

+        }

+    }

+

+    /**

+     * Creates a new typed flow entry with the given flow entry fe.

+     *

+     * @param fe flow entry

+     */

+    public static TypedStoredFlowEntry newTypedStoredFlowEntry(FlowEntry fe) {

+        if (fe == null) {

+            return null;

+        }

+

+        long life = fe.life();

+

+        if (life >= LONG_POLL_INTERVAL) {

+            return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.LONG_FLOW);

+        } else if (life >= MID_POLL_INTERVAL) {

+            return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.MID_FLOW);

+        } else if (life >= CAL_AND_POLL_INTERVAL) {

+            return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW);

+        } else if (life >= 0) {

+            return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW);

+        } else { // life < 0

+            return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW);

+        }

+    }

+}

diff --git a/core/net/pom.xml b/core/net/pom.xml
index 9ea0007..c5d3126 100644
--- a/core/net/pom.xml
+++ b/core/net/pom.xml
@@ -52,6 +52,20 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
+            <version>${project.version}</version>
+            <artifactId>onos-cli</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-cli</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onos-core-common</artifactId>
             <version>${project.version}</version>
             <classifier>tests</classifier>
diff --git a/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java b/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
new file mode 100644
index 0000000..6515ef3
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
@@ -0,0 +1,634 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.net.statistic.impl;

+

+import com.google.common.base.MoreObjects;

+import com.google.common.base.Predicate;

+import com.google.common.collect.ImmutableSet;

+import org.apache.felix.scr.annotations.Activate;

+import org.apache.felix.scr.annotations.Component;

+import org.apache.felix.scr.annotations.Deactivate;

+import org.apache.felix.scr.annotations.Reference;

+import org.apache.felix.scr.annotations.ReferenceCardinality;

+import org.apache.felix.scr.annotations.Service;

+import org.onosproject.cli.Comparators;

+import org.onosproject.net.ConnectPoint;

+import org.onosproject.net.Device;

+import org.onosproject.net.Port;

+import org.onosproject.net.PortNumber;

+import org.onosproject.net.device.DeviceService;

+import org.onosproject.net.flow.DefaultTypedFlowEntry;

+import org.onosproject.net.flow.FlowEntry;

+import org.onosproject.net.flow.FlowRule;

+import org.onosproject.net.flow.FlowRuleEvent;

+import org.onosproject.net.flow.FlowRuleListener;

+import org.onosproject.net.flow.FlowRuleService;

+import org.onosproject.net.flow.TypedStoredFlowEntry;

+import org.onosproject.net.flow.instructions.Instruction;

+import org.onosproject.net.statistic.DefaultLoad;

+import org.onosproject.net.statistic.FlowStatisticService;

+import org.onosproject.net.statistic.Load;

+import org.onosproject.net.statistic.FlowStatisticStore;

+import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;

+import org.onosproject.net.statistic.TypedFlowEntryWithLoad;

+

+import org.slf4j.Logger;

+

+import java.util.ArrayList;

+import java.util.HashMap;

+import java.util.List;

+import java.util.Map;

+import java.util.Objects;

+import java.util.Set;

+import java.util.TreeMap;

+import java.util.stream.Collectors;

+

+import static com.google.common.base.Preconditions.checkNotNull;

+import static org.onosproject.security.AppGuard.checkPermission;

+import static org.slf4j.LoggerFactory.getLogger;

+import static org.onosproject.security.AppPermission.Type.*;

+

+/**

+ * Provides an implementation of the Flow Statistic Service.

+ */

+@Component(immediate = true, enabled = true)

+@Service

+public class FlowStatisticManager implements FlowStatisticService {

+    private final Logger log = getLogger(getClass());

+

+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

+    protected FlowRuleService flowRuleService;

+

+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

+    protected FlowStatisticStore flowStatisticStore;

+

+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

+    protected DeviceService deviceService;

+

+    private final InternalFlowRuleStatsListener frListener = new InternalFlowRuleStatsListener();

+

+    @Activate

+    public void activate() {

+        flowRuleService.addListener(frListener);

+        log.info("Started");

+    }

+

+    @Deactivate

+    public void deactivate() {

+        flowRuleService.removeListener(frListener);

+        log.info("Stopped");

+    }

+

+    @Override

+    public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {

+        checkPermission(STATISTIC_READ);

+

+        Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

+

+        if (device == null) {

+            return summaryLoad;

+        }

+

+        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

+

+        for (Port port : ports) {

+            ConnectPoint cp = new ConnectPoint(device.id(), port.number());

+            SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);

+            summaryLoad.put(cp, sfe);

+        }

+

+        return summaryLoad;

+    }

+

+    @Override

+    public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {

+        checkPermission(STATISTIC_READ);

+

+        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);

+        return loadSummaryPortInternal(cp);

+    }

+

+    @Override

+    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,

+                                                                  TypedStoredFlowEntry.FlowLiveType liveType,

+                                                                  Instruction.Type instType) {

+        checkPermission(STATISTIC_READ);

+

+        Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

+

+        if (device == null) {

+            return allLoad;

+        }

+

+        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

+

+        for (Port port : ports) {

+            ConnectPoint cp = new ConnectPoint(device.id(), port.number());

+            List<TypedFlowEntryWithLoad> tfel = loadAllPortInternal(cp, liveType, instType);

+            allLoad.put(cp, tfel);

+        }

+

+        return allLoad;

+    }

+

+    @Override

+    public List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,

+                                               TypedStoredFlowEntry.FlowLiveType liveType,

+                                               Instruction.Type instType) {

+        checkPermission(STATISTIC_READ);

+

+        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);

+        return loadAllPortInternal(cp, liveType, instType);

+    }

+

+    @Override

+    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,

+                                                                   TypedStoredFlowEntry.FlowLiveType liveType,

+                                                                   Instruction.Type instType,

+                                                                   int topn) {

+        checkPermission(STATISTIC_READ);

+

+        Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

+

+        if (device == null) {

+            return allLoad;

+        }

+

+        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

+

+        for (Port port : ports) {

+            ConnectPoint cp = new ConnectPoint(device.id(), port.number());

+            List<TypedFlowEntryWithLoad> tfel = loadTopnPortInternal(cp, liveType, instType, topn);

+            allLoad.put(cp, tfel);

+        }

+

+        return allLoad;

+    }

+

+    @Override

+    public List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,

+                                                TypedStoredFlowEntry.FlowLiveType liveType,

+                                                Instruction.Type instType,

+                                                int topn) {

+        checkPermission(STATISTIC_READ);

+

+        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);

+        return loadTopnPortInternal(cp, liveType, instType, topn);

+    }

+

+    private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {

+        checkPermission(STATISTIC_READ);

+

+        Set<FlowEntry> currentStats;

+        Set<FlowEntry> previousStats;

+

+        TypedStatistics typedStatistics;

+        synchronized (flowStatisticStore) {

+             currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);

+            if (currentStats == null) {

+                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());

+            }

+            previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);

+            if (previousStats == null) {

+                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());

+            }

+            // copy to local flow entry

+            typedStatistics = new TypedStatistics(currentStats, previousStats);

+

+            // Check for validity of this stats data

+            checkLoadValidity(currentStats, previousStats);

+        }

+

+        // current and previous set is not empty!

+        Set<FlowEntry> currentSet = typedStatistics.current();

+        Set<FlowEntry> previousSet = typedStatistics.previous();

+        Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),

+                TypedFlowEntryWithLoad.avgPollInterval());

+

+        Map<FlowRule, TypedStoredFlowEntry> currentMap;

+        Map<FlowRule, TypedStoredFlowEntry> previousMap;

+

+        currentMap = typedStatistics.currentImmediate();

+        previousMap = typedStatistics.previousImmediate();

+        Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.shortPollInterval());

+

+        currentMap = typedStatistics.currentShort();

+        previousMap = typedStatistics.previousShort();

+        Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.shortPollInterval());

+

+        currentMap = typedStatistics.currentMid();

+        previousMap = typedStatistics.previousMid();

+        Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.midPollInterval());

+

+        currentMap = typedStatistics.currentLong();

+        previousMap = typedStatistics.previousLong();

+        Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.longPollInterval());

+

+        currentMap = typedStatistics.currentUnknown();

+        previousMap = typedStatistics.previousUnknown();

+        Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.avgPollInterval());

+

+        return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);

+    }

+

+    private List<TypedFlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,

+                                                             TypedStoredFlowEntry.FlowLiveType liveType,

+                                                             Instruction.Type instType) {

+        checkPermission(STATISTIC_READ);

+

+        List<TypedFlowEntryWithLoad> retTFEL = new ArrayList<>();

+

+        Set<FlowEntry> currentStats;

+        Set<FlowEntry> previousStats;

+

+        TypedStatistics typedStatistics;

+        synchronized (flowStatisticStore) {

+            currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);

+            if (currentStats == null) {

+                return retTFEL;

+            }

+            previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);

+            if (previousStats == null) {

+                return retTFEL;

+            }

+            // copy to local flow entry set

+            typedStatistics = new TypedStatistics(currentStats, previousStats);

+

+            // Check for validity of this stats data

+            checkLoadValidity(currentStats, previousStats);

+        }

+

+        // current and previous set is not empty!

+        boolean isAllLiveType = (liveType == null ? true : false); // null is all live type

+        boolean isAllInstType = (instType == null ? true : false); // null is all inst type

+

+        Map<FlowRule, TypedStoredFlowEntry> currentMap;

+        Map<FlowRule, TypedStoredFlowEntry> previousMap;

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW) {

+            currentMap = typedStatistics.currentImmediate();

+            previousMap = typedStatistics.previousImmediate();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW) {

+            currentMap = typedStatistics.currentShort();

+            previousMap = typedStatistics.previousShort();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.MID_FLOW) {

+            currentMap = typedStatistics.currentMid();

+            previousMap = typedStatistics.previousMid();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.midPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.LONG_FLOW) {

+            currentMap = typedStatistics.currentLong();

+            previousMap = typedStatistics.previousLong();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.longPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW) {

+            currentMap = typedStatistics.currentUnknown();

+            previousMap = typedStatistics.previousUnknown();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.avgPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        return retTFEL;

+    }

+

+    private List<TypedFlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,

+                                                                      Map<FlowRule, TypedStoredFlowEntry> currentMap,

+                                                                      Map<FlowRule, TypedStoredFlowEntry> previousMap,

+                                                                      boolean isAllInstType,

+                                                                      Instruction.Type instType,

+                                                                      int liveTypePollInterval) {

+        List<TypedFlowEntryWithLoad> fel = new ArrayList<>();

+

+        for (TypedStoredFlowEntry tfe : currentMap.values()) {

+            if (isAllInstType ||

+                    tfe.treatment().allInstructions().stream().

+                            filter(i -> i.type() == instType).

+                            findAny().isPresent()) {

+                long currentBytes = tfe.bytes();

+                long previousBytes = previousMap.getOrDefault(tfe, new DefaultTypedFlowEntry((FlowRule) tfe)).bytes();

+                Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);

+                fel.add(new TypedFlowEntryWithLoad(cp, tfe, fLoad));

+            }

+        }

+

+        return fel;

+    }

+

+    private List<TypedFlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,

+                                                             TypedStoredFlowEntry.FlowLiveType liveType,

+                                                             Instruction.Type instType,

+                                                             int topn) {

+        List<TypedFlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);

+

+        // Sort with descending order of load

+        List<TypedFlowEntryWithLoad> tfel =

+                fel.stream().sorted(Comparators.TYPEFLOWENTRY_WITHLOAD_COMPARATOR).

+                        limit(topn).collect(Collectors.toList());

+

+        return tfel;

+    }

+

+    private long aggregateBytesSet(Set<FlowEntry> setFE) {

+        return setFE.stream().mapToLong(FlowEntry::bytes).sum();

+    }

+

+    private long aggregateBytesMap(Map<FlowRule, TypedStoredFlowEntry> mapFE) {

+        return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();

+    }

+

+    /**

+     * Internal data class holding two set of typed flow entries.

+     */

+    private static class TypedStatistics {

+        private final ImmutableSet<FlowEntry> currentAll;

+        private final ImmutableSet<FlowEntry> previousAll;

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentImmediate = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousImmediate = new HashMap<>();

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentShort = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousShort = new HashMap<>();

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentMid = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousMid = new HashMap<>();

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentLong = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousLong = new HashMap<>();

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentUnknown = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousUnknown = new HashMap<>();

+

+        public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {

+            this.currentAll = ImmutableSet.copyOf(checkNotNull(current));

+            this.previousAll = ImmutableSet.copyOf(checkNotNull(previous));

+

+            currentAll.forEach(fe -> {

+                TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);

+

+                switch (tfe.flowLiveType()) {

+                    case IMMEDIATE_FLOW:

+                        currentImmediate.put(fe, tfe);

+                        break;

+                    case SHORT_FLOW:

+                        currentShort.put(fe, tfe);

+                        break;

+                    case MID_FLOW:

+                        currentMid.put(fe, tfe);

+                        break;

+                    case LONG_FLOW:

+                        currentLong.put(fe, tfe);

+                        break;

+                    default:

+                        currentUnknown.put(fe, tfe);

+                        break;

+                }

+            });

+

+            previousAll.forEach(fe -> {

+                TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);

+

+                switch (tfe.flowLiveType()) {

+                    case IMMEDIATE_FLOW:

+                        if (currentImmediate.containsKey(fe)) {

+                            previousImmediate.put(fe, tfe);

+                        } else if (currentShort.containsKey(fe)) {

+                            previousShort.put(fe, tfe);

+                        } else if (currentMid.containsKey(fe)) {

+                            previousMid.put(fe, tfe);

+                        } else if (currentLong.containsKey(fe)) {

+                            previousLong.put(fe, tfe);

+                        } else {

+                            previousUnknown.put(fe, tfe);

+                        }

+                        break;

+                    case SHORT_FLOW:

+                        if (currentShort.containsKey(fe)) {

+                            previousShort.put(fe, tfe);

+                        } else if (currentMid.containsKey(fe)) {

+                            previousMid.put(fe, tfe);

+                        } else if (currentLong.containsKey(fe)) {

+                            previousLong.put(fe, tfe);

+                        } else {

+                            previousUnknown.put(fe, tfe);

+                        }

+                        break;

+                    case MID_FLOW:

+                        if (currentMid.containsKey(fe)) {

+                            previousMid.put(fe, tfe);

+                        } else if (currentLong.containsKey(fe)) {

+                            previousLong.put(fe, tfe);

+                        } else {

+                            previousUnknown.put(fe, tfe);

+                        }

+                        break;

+                    case LONG_FLOW:

+                        if (currentLong.containsKey(fe)) {

+                            previousLong.put(fe, tfe);

+                        } else {

+                            previousUnknown.put(fe, tfe);

+                        }

+                        break;

+                    default:

+                        previousUnknown.put(fe, tfe);

+                        break;

+                }

+            });

+        }

+

+        /**

+         * Returns flow entries as the current value.

+         *

+         * @return flow entries as the current value

+         */

+        public ImmutableSet<FlowEntry> current() {

+            return currentAll;

+        }

+

+        /**

+         * Returns flow entries as the previous value.

+         *

+         * @return flow entries as the previous value

+         */

+        public ImmutableSet<FlowEntry> previous() {

+            return previousAll;

+        }

+

+        public Map<FlowRule, TypedStoredFlowEntry> currentImmediate() {

+            return currentImmediate;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousImmediate() {

+            return previousImmediate;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> currentShort() {

+            return currentShort;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousShort() {

+            return previousShort;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> currentMid() {

+            return currentMid;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousMid() {

+            return previousMid;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> currentLong() {

+            return currentLong;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousLong() {

+            return previousLong;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> currentUnknown() {

+            return currentUnknown;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousUnknown() {

+            return previousUnknown;

+        }

+

+        /**

+         * Validates values are not empty.

+         *

+         * @return false if either of the sets is empty. Otherwise, true.

+         */

+        public boolean isValid() {

+            return !(currentAll.isEmpty() || previousAll.isEmpty());

+        }

+

+        @Override

+        public int hashCode() {

+            return Objects.hash(currentAll, previousAll);

+        }

+

+        @Override

+        public boolean equals(Object obj) {

+            if (this == obj) {

+                return true;

+            }

+            if (!(obj instanceof TypedStatistics)) {

+                return false;

+            }

+            final TypedStatistics other = (TypedStatistics) obj;

+            return Objects.equals(this.currentAll, other.currentAll) &&

+                    Objects.equals(this.previousAll, other.previousAll);

+        }

+

+        @Override

+        public String toString() {

+            return MoreObjects.toStringHelper(this)

+                    .add("current", currentAll)

+                    .add("previous", previousAll)

+                    .toString();

+        }

+    }

+

+    private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {

+        current.stream().forEach(c -> {

+            FlowEntry f = previous.stream().filter(p -> c.equals(p)).

+                    findAny().orElse(null);

+            if (f != null && c.bytes() < f.bytes()) {

+                log.debug("FlowStatisticManager:checkLoadValidity():" +

+                        "Error: " + c + " :Previous bytes=" + f.bytes() +

+                        " is larger than current bytes=" + c.bytes() + " !!!");

+            }

+        });

+

+    }

+

+    /**

+     * Creates a predicate that checks the instruction type of a flow entry is the same as

+     * the specified instruction type.

+     *

+     * @param instType instruction type to be checked

+     * @return predicate

+     */

+    private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {

+        return new Predicate<FlowEntry>() {

+            @Override

+            public boolean apply(FlowEntry flowEntry) {

+                List<Instruction> allInstructions = flowEntry.treatment().allInstructions();

+

+                return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();

+            }

+        };

+    }

+

+    /**

+     * Internal flow rule event listener for FlowStatisticManager.

+     */

+    private class InternalFlowRuleStatsListener implements FlowRuleListener {

+

+        @Override

+        public void event(FlowRuleEvent event) {

+            FlowRule rule = event.subject();

+            switch (event.type()) {

+                case RULE_ADDED:

+                    if (rule instanceof FlowEntry) {

+                        flowStatisticStore.addFlowStatistic((FlowEntry) rule);

+                    }

+                    break;

+                case RULE_UPDATED:

+                    flowStatisticStore.updateFlowStatistic((FlowEntry) rule);

+                    break;

+                case RULE_ADD_REQUESTED:

+                    break;

+                case RULE_REMOVE_REQUESTED:

+                    break;

+                case RULE_REMOVED:

+                    flowStatisticStore.removeFlowStatistic(rule);

+                    break;

+                default:

+                    log.warn("Unknown flow rule event {}", event);

+            }

+        }

+    }

+}

diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
new file mode 100644
index 0000000..40c3e2d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
@@ -0,0 +1,288 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.store.statistic.impl;

+

+import com.google.common.base.Objects;

+import org.apache.felix.scr.annotations.Activate;

+import org.apache.felix.scr.annotations.Component;

+import org.apache.felix.scr.annotations.Deactivate;

+import org.apache.felix.scr.annotations.Reference;

+import org.apache.felix.scr.annotations.ReferenceCardinality;

+import org.apache.felix.scr.annotations.Service;

+import org.onlab.util.KryoNamespace;

+import org.onlab.util.Tools;

+import org.onosproject.cluster.ClusterService;

+import org.onosproject.cluster.NodeId;

+import org.onosproject.mastership.MastershipService;

+import org.onosproject.net.ConnectPoint;

+import org.onosproject.net.DeviceId;

+import org.onosproject.net.PortNumber;

+import org.onosproject.net.flow.FlowEntry;

+import org.onosproject.net.flow.FlowRule;

+import org.onosproject.net.flow.instructions.Instruction;

+import org.onosproject.net.flow.instructions.Instructions;

+import org.onosproject.net.statistic.FlowStatisticStore;

+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;

+import org.onosproject.store.serializers.KryoNamespaces;

+import org.onosproject.store.serializers.KryoSerializer;

+import org.slf4j.Logger;

+

+import java.util.Collections;

+import java.util.HashSet;

+import java.util.Map;

+import java.util.Set;

+import java.util.concurrent.ConcurrentHashMap;

+import java.util.concurrent.ExecutorService;

+import java.util.concurrent.Executors;

+import java.util.concurrent.TimeUnit;

+

+import static org.onlab.util.Tools.groupedThreads;

+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;

+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;

+import static org.slf4j.LoggerFactory.getLogger;

+

+/**

+ * Maintains flow statistics using RPC calls to collect stats from remote instances

+ * on demand.

+ */

+@Component(immediate = true)

+@Service

+public class DistributedFlowStatisticStore implements FlowStatisticStore {

+    private final Logger log = getLogger(getClass());

+

+    // TODO: Make configurable.

+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;

+

+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

+    protected MastershipService mastershipService;

+

+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

+    protected ClusterCommunicationService clusterCommunicator;

+

+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

+    protected ClusterService clusterService;

+

+    private Map<ConnectPoint, Set<FlowEntry>> previous =

+            new ConcurrentHashMap<>();

+

+    private Map<ConnectPoint, Set<FlowEntry>> current =

+            new ConcurrentHashMap<>();

+

+    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {

+        @Override

+        protected void setupKryoPool() {

+            serializerPool = KryoNamespace.newBuilder()

+                    .register(KryoNamespaces.API)

+                    .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)

+                            // register this store specific classes here

+                    .build();

+        }

+    };

+

+    private NodeId local;

+    private ExecutorService messageHandlingExecutor;

+

+    private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;

+

+    @Activate

+    public void activate() {

+        local = clusterService.getLocalNode().id();

+

+        messageHandlingExecutor = Executors.newFixedThreadPool(

+                MESSAGE_HANDLER_THREAD_POOL_SIZE,

+                groupedThreads("onos/store/statistic", "message-handlers"));

+

+        clusterCommunicator.addSubscriber(

+                GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,

+                messageHandlingExecutor);

+

+        clusterCommunicator.addSubscriber(

+                GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,

+                messageHandlingExecutor);

+

+        log.info("Started");

+    }

+

+    @Deactivate

+    public void deactivate() {

+        clusterCommunicator.removeSubscriber(GET_PREVIOUS);

+        clusterCommunicator.removeSubscriber(GET_CURRENT);

+        messageHandlingExecutor.shutdown();

+        log.info("Stopped");

+    }

+

+    @Override

+    public synchronized void removeFlowStatistic(FlowRule rule) {

+        ConnectPoint cp = buildConnectPoint(rule);

+        if (cp == null) {

+            return;

+        }

+

+        // remove this rule if present from current map

+        current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e;  });

+

+        // remove this on if present from previous map

+        previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });

+    }

+

+    @Override

+    public synchronized void addFlowStatistic(FlowEntry rule) {

+        ConnectPoint cp = buildConnectPoint(rule);

+        if (cp == null) {

+            return;

+        }

+

+        // create one if absent and add this rule

+        current.putIfAbsent(cp, new HashSet<>());

+        current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });

+

+        // remove previous one if present

+        previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });

+    }

+

+    public synchronized void updateFlowStatistic(FlowEntry rule) {

+        ConnectPoint cp = buildConnectPoint(rule);

+        if (cp == null) {

+            return;

+        }

+

+        Set<FlowEntry> curr = current.get(cp);

+        if (curr == null) {

+            addFlowStatistic(rule);

+        } else {

+            FlowEntry f = curr.stream().filter(c -> rule.equals(c)).

+                    findAny().orElse(null);

+            if (rule.bytes() < f.bytes()) {

+                log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +

+                        " Invalid Flow Update! Will be removed!!" +

+                        " curr flowId=" + Long.toHexString(rule.id().value()) +

+                        ", prev flowId=" + Long.toHexString(f.id().value()) +

+                        ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.bytes() +

+                        ", curr life=" + rule.life() + ", prev life=" + f.life() +

+                        ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.lastSeen());

+                // something is wrong! invalid flow entry, so delete it

+                removeFlowStatistic(rule);

+                return;

+            }

+            Set<FlowEntry> prev = previous.get(cp);

+            if (prev == null) {

+                prev = new HashSet<>();

+                previous.put(cp, prev);

+            }

+

+            // previous one is exist

+            if (f != null) {

+                // remove old one and add new one

+                prev.remove(rule);

+                if (!prev.add(f)) {

+                    log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +

+                                    " flowId={}, add failed into previous.",

+                            Long.toHexString(rule.id().value()));

+                }

+            }

+

+            // remove old one and add new one

+            curr.remove(rule);

+            if (!curr.add(rule)) {

+                log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +

+                                " flowId={}, add failed into current.",

+                        Long.toHexString(rule.id().value()));

+            }

+        }

+    }

+

+    @Override

+    public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {

+        final DeviceId deviceId = connectPoint.deviceId();

+

+        NodeId master = mastershipService.getMasterFor(deviceId);

+        if (master == null) {

+            log.warn("No master for {}", deviceId);

+            return Collections.emptySet();

+        }

+

+        if (Objects.equal(local, master)) {

+            return getCurrentStatisticInternal(connectPoint);

+        } else {

+            return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(

+                            connectPoint,

+                            GET_CURRENT,

+                            SERIALIZER::encode,

+                            SERIALIZER::decode,

+                            master),

+                    STATISTIC_STORE_TIMEOUT_MILLIS,

+                    TimeUnit.MILLISECONDS,

+                    Collections.emptySet());

+        }

+    }

+

+    private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {

+        return current.get(connectPoint);

+    }

+

+    @Override

+    public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {

+        final DeviceId deviceId = connectPoint.deviceId();

+

+        NodeId master = mastershipService.getMasterFor(deviceId);

+        if (master == null) {

+            log.warn("No master for {}", deviceId);

+            return Collections.emptySet();

+        }

+

+        if (Objects.equal(local, master)) {

+            return getPreviousStatisticInternal(connectPoint);

+        } else {

+            return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(

+                            connectPoint,

+                            GET_PREVIOUS,

+                            SERIALIZER::encode,

+                            SERIALIZER::decode,

+                            master),

+                    STATISTIC_STORE_TIMEOUT_MILLIS,

+                    TimeUnit.MILLISECONDS,

+                    Collections.emptySet());

+        }

+    }

+

+    private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {

+        return previous.get(connectPoint);

+    }

+

+    private ConnectPoint buildConnectPoint(FlowRule rule) {

+        PortNumber port = getOutput(rule);

+

+        if (port == null) {

+            return null;

+        }

+        ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);

+        return cp;

+    }

+

+    private PortNumber getOutput(FlowRule rule) {

+        for (Instruction i : rule.treatment().allInstructions()) {

+            if (i.type() == Instruction.Type.OUTPUT) {

+                Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;

+                return out.port();

+            }

+            if (i.type() == Instruction.Type.DROP) {

+                return PortNumber.P0;

+            }

+        }

+        return null;

+    }

+}
\ No newline at end of file
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java
index 181ba00..a81367c 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java
@@ -717,25 +717,9 @@
             long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());

             // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply

             long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);

-

             // fe.life() unit is SECOND!

             long liveTime = fe.life() + fromLastSeen;

 

-            // check flow timeout

-            if (fe.timeout() > calAndPollInterval && fromLastSeen > fe.timeout()) {

-                if (!fe.isPermanent()) {

-                    log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())

-                                    + ", liveType=" + fe.flowLiveType()

-                                    + ", liveTime=" + liveTime

-                                    + ", life=" + fe.life()

-                                    + ", fromLastSeen=" + fromLastSeen

-                                    + ", timeout=" + fe.timeout()

-                                    + ", isPermanent=" + fe.isPermanent()

-                                    + " AdaptiveStats collection thread for {}",

-                            sw.getStringId());

-                    return false;

-                }

-            }

 

             switch (fe.flowLiveType()) {

                 case IMMEDIATE_FLOW: