aggregate flow replies on io thread

Change-Id: I622290f213ee830cfab7e4bd4ad7a52f612b475e
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index fc05b6f..0744b1b 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -58,8 +58,6 @@
 import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFPortStatus;
 import org.projectfloodlight.openflow.protocol.OFStatsReply;
-import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
-import org.projectfloodlight.openflow.protocol.OFStatsType;
 import org.projectfloodlight.openflow.protocol.OFVersion;
 import org.projectfloodlight.openflow.protocol.action.OFAction;
 import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
@@ -332,24 +330,17 @@
         }
 
         private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
-            if (stats.getStatsType() != OFStatsType.FLOW) {
-                return;
-            }
+
             DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
             final OFFlowStatsReply replies = (OFFlowStatsReply) stats;
-            //final List<FlowRule> entries = Lists.newLinkedList();
 
-            for (OFFlowStatsEntry reply : replies.getEntries()) {
-                if (!tableMissRule(dpid, reply)) {
-                    completeEntries.put(did, new FlowEntryBuilder(dpid, reply).build());
-                }
-            }
+            List<FlowEntry> flowEntries = replies.getEntries().stream()
+                    .filter(entry -> !tableMissRule(dpid, entry))
+                    .map(entry -> new FlowEntryBuilder(dpid, entry).build())
+                    .collect(Collectors.toList());
 
-            if (!stats.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
-                log.trace("sending flowstats to core {}", completeEntries.get(did));
-                providerService.pushFlowMetrics(did, completeEntries.get(did));
-                completeEntries.removeAll(did);
-            }
+            providerService.pushFlowMetrics(did, flowEntries);
+
         }
 
         private boolean tableMissRule(Dpid dpid, OFFlowStatsEntry reply) {