supporting multipart stats replies: tested upto 15K rules

Change-Id: I36fbd99d012a74c1f5240f37b60d3b58be85626c
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index 7ddc65d..eeffe85 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -2,7 +2,6 @@
 
 import static org.slf4j.LoggerFactory.getLogger;
 
-import java.util.List;
 import java.util.Map;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -29,11 +28,13 @@
 import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFPortStatus;
 import org.projectfloodlight.openflow.protocol.OFStatsReply;
+import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
 import org.projectfloodlight.openflow.protocol.OFStatsType;
 import org.slf4j.Logger;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
 /**
  * Provider which uses an OpenFlow controller to detect network
@@ -114,6 +115,8 @@
     implements OpenFlowSwitchListener, OpenFlowEventListener {
 
         private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+        private final Multimap<DeviceId, FlowRule> completeEntries =
+                ArrayListMultimap.create();
 
         @Override
         public void switchAdded(Dpid dpid) {
@@ -153,17 +156,24 @@
 
         }
 
-        private void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
+        private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
             if (stats.getStatsType() != OFStatsType.FLOW) {
                 return;
             }
+            DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
             final OFFlowStatsReply replies = (OFFlowStatsReply) stats;
-            final List<FlowRule> entries = Lists.newLinkedList();
+            //final List<FlowRule> entries = Lists.newLinkedList();
+
             for (OFFlowStatsEntry reply : replies.getEntries()) {
-                entries.add(new FlowRuleBuilder(dpid, reply).build());
+                completeEntries.put(did, new FlowRuleBuilder(dpid, reply).build());
+                //entries.add(new FlowRuleBuilder(dpid, reply).build());
             }
-            log.debug("sending flowstats to core {}", entries);
-            providerService.pushFlowMetrics(DeviceId.deviceId(Dpid.uri(dpid)), entries);
+
+            if (!stats.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
+                log.debug("sending flowstats to core {}", completeEntries.get(did));
+                providerService.pushFlowMetrics(did, completeEntries.get(did));
+                completeEntries.removeAll(did);
+            }
         }
 
     }