aggregate flow replies on io thread

Change-Id: I622290f213ee830cfab7e4bd4ad7a52f612b475e
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 2611a6d..d681524 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -355,7 +355,7 @@
 
         @Override
         public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
-            List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId));
+            Set<FlowEntry> storedRules = Sets.newHashSet(store.getFlowEntries(deviceId));
 
             for (FlowEntry rule : flowEntries) {
                 if (storedRules.remove(rule)) {
diff --git a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 9ac8b5b..edd2f3a 100644
--- a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -17,6 +17,7 @@
 
 import static org.onlab.util.Tools.namedThreads;
 
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -25,6 +26,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.collect.Lists;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -41,11 +43,15 @@
 import org.onosproject.openflow.controller.driver.OpenFlowAgent;
 import org.projectfloodlight.openflow.protocol.OFCircuitPortStatus;
 import org.projectfloodlight.openflow.protocol.OFExperimenter;
+import org.projectfloodlight.openflow.protocol.OFFactories;
+import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
+import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
 import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFPacketIn;
 import org.projectfloodlight.openflow.protocol.OFPortDesc;
 import org.projectfloodlight.openflow.protocol.OFPortStatus;
 import org.projectfloodlight.openflow.protocol.OFStatsReply;
+import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
 import org.projectfloodlight.openflow.protocol.OFStatsType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,6 +85,9 @@
 
     protected Set<OpenFlowEventListener> ofEventListener = Sets.newHashSet();
 
+    protected Multimap<Dpid, OFFlowStatsEntry> fullStats =
+            ArrayListMultimap.create();
+
     private final Controller ctrl = new Controller();
 
     @Activate
@@ -160,6 +169,7 @@
 
     @Override
     public void processPacket(Dpid dpid, OFMessage msg) {
+        Collection<OFFlowStatsEntry> stats;
         switch (msg.getType()) {
         case PORT_STATUS:
             for (OpenFlowSwitchListener l : ofSwitchListener) {
@@ -186,7 +196,15 @@
                     l.switchChanged(dpid);
                 }
             }
-            // fall through to invoke handler
+            stats = publishStats(dpid, reply);
+            if (stats != null) {
+                OFFlowStatsReply.Builder rep =
+                        OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
+                rep.setEntries(Lists.newLinkedList(stats));
+                executor.submit(new OFMessageHandler(dpid, rep.build()));
+            }
+            break;
+
         case FLOW_REMOVED:
         case ERROR:
         case BARRIER_REPLY:
@@ -218,6 +236,20 @@
         }
     }
 
+    private synchronized Collection<OFFlowStatsEntry> publishStats(Dpid dpid,
+                                                                   OFStatsReply reply) {
+        //TODO: Get rid of synchronized
+        if (reply.getStatsType() != OFStatsType.FLOW) {
+            return null;
+        }
+        final OFFlowStatsReply replies = (OFFlowStatsReply) reply;
+        fullStats.putAll(dpid, replies.getEntries());
+        if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
+            return fullStats.removeAll(dpid);
+        }
+        return null;
+    }
+
     @Override
     public void setRole(Dpid dpid, RoleState role) {
         final OpenFlowSwitch sw = getSwitch(dpid);
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index fc05b6f..0744b1b 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -58,8 +58,6 @@
 import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFPortStatus;
 import org.projectfloodlight.openflow.protocol.OFStatsReply;
-import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
-import org.projectfloodlight.openflow.protocol.OFStatsType;
 import org.projectfloodlight.openflow.protocol.OFVersion;
 import org.projectfloodlight.openflow.protocol.action.OFAction;
 import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
@@ -332,24 +330,17 @@
         }
 
         private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
-            if (stats.getStatsType() != OFStatsType.FLOW) {
-                return;
-            }
+
             DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
             final OFFlowStatsReply replies = (OFFlowStatsReply) stats;
-            //final List<FlowRule> entries = Lists.newLinkedList();
 
-            for (OFFlowStatsEntry reply : replies.getEntries()) {
-                if (!tableMissRule(dpid, reply)) {
-                    completeEntries.put(did, new FlowEntryBuilder(dpid, reply).build());
-                }
-            }
+            List<FlowEntry> flowEntries = replies.getEntries().stream()
+                    .filter(entry -> !tableMissRule(dpid, entry))
+                    .map(entry -> new FlowEntryBuilder(dpid, entry).build())
+                    .collect(Collectors.toList());
 
-            if (!stats.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
-                log.trace("sending flowstats to core {}", completeEntries.get(did));
-                providerService.pushFlowMetrics(did, completeEntries.get(did));
-                completeEntries.removeAll(did);
-            }
+            providerService.pushFlowMetrics(did, flowEntries);
+
         }
 
         private boolean tableMissRule(Dpid dpid, OFFlowStatsEntry reply) {