[Emu] openTAM: NewAdaptiveFlowStatsCollector Implementation

  - NewAdaptiveFlowStatsCollector.java
   .Bug fix to initialize callCountCalAndShortFlowsTask value
   .Added flowMissingXid variable to identify individual StatsRequest or match all StatsRequest message or not
  - DefaultTypedFlowEntry.java, TypedStoredFlowEntry.java
   .Added javadoc for class
  - OpenFlowRuleProvider.java
   .Line 2: 2014 -> 2015
   .Added adaptiveFlowSampling boolean property with default
   .Added call providerService.pushFlowMetricsWithoutFlowMissing in case of  individual StatsRequest
  - FlowRuleProviderService.java
   .Added pushFlowMetricsWithoutFlowMissing() function
  - FlowRuleManager.java
   .Added pushFlowMetricsWithoutFlowMissing() implementation
  - OpenFlowControllerImpl.java
   .Bug fix to unchange the StatsRequest Xid value in case of StatsReply Flow message type

Change-Id: Id4dc4a164da654af7b6dfb090af7336e748ef118
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index de079e0..949c657 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Open Networking Laboratory
+ * Copyright 2015 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -76,6 +76,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -99,11 +100,16 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
-    private static final int DEFAULT_POLL_FREQUENCY = 10;
+    private static final int DEFAULT_POLL_FREQUENCY = 5;
     @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
             label = "Frequency (in seconds) for polling flow statistics")
     private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
 
+    private static final boolean DEFAULT_ADAPTIVE_FLOW_SAMPLING = true;
+    @Property(name = "adaptiveFlowSampling", boolValue = DEFAULT_ADAPTIVE_FLOW_SAMPLING,
+            label = "Adaptive Flow Sampling is on or off")
+    private boolean adaptiveFlowSampling = DEFAULT_ADAPTIVE_FLOW_SAMPLING;
+
     private FlowRuleProviderService providerService;
 
     private final InternalFlowProvider listener = new InternalFlowProvider();
@@ -111,7 +117,10 @@
     private Cache<Long, InternalCacheEntry> pendingBatches;
 
     private final Timer timer = new Timer("onos-openflow-collector");
-    private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+    private final Map<Dpid, FlowStatsCollector> simpleCollectors = Maps.newHashMap();
+
+    // NewAdaptiveFlowStatsCollector Set
+    private final Map<Dpid, NewAdaptiveFlowStatsCollector> afsCollectors = Maps.newHashMap();
 
     /**
      * Creates an OpenFlow host provider.
@@ -128,9 +137,11 @@
         controller.addEventListener(listener);
 
         pendingBatches = createBatchCache();
+
         createCollectors();
 
-        log.info("Started");
+        log.info("Started with flowPollFrequency = {}, adaptiveFlowSampling = {}",
+                flowPollFrequency, adaptiveFlowSampling);
     }
 
     @Deactivate
@@ -161,6 +172,20 @@
         }
 
         log.info("Settings: flowPollFrequency={}", flowPollFrequency);
+
+        boolean newAdaptiveFlowSampling;
+        String s = get(properties, "adaptiveFlowSampling");
+        newAdaptiveFlowSampling = isNullOrEmpty(s) ? adaptiveFlowSampling : Boolean.parseBoolean(s.trim());
+
+        if (newAdaptiveFlowSampling != adaptiveFlowSampling) {
+            // stop previous collector
+            stopCollectors();
+            adaptiveFlowSampling = newAdaptiveFlowSampling;
+            // create new collectors
+            createCollectors();
+        }
+
+        log.info("Settings: adaptiveFlowSampling={}", adaptiveFlowSampling);
     }
 
     private Cache<Long, InternalCacheEntry> createBatchCache() {
@@ -179,19 +204,38 @@
     }
 
     private void createCollector(OpenFlowSwitch sw) {
-        FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
-        fsc.start();
-        collectors.put(new Dpid(sw.getId()), fsc);
+        if (adaptiveFlowSampling) {
+            // NewAdaptiveFlowStatsCollector Constructor
+            NewAdaptiveFlowStatsCollector fsc = new NewAdaptiveFlowStatsCollector(sw, flowPollFrequency);
+            fsc.start();
+            afsCollectors.put(new Dpid(sw.getId()), fsc);
+        } else {
+            FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
+            fsc.start();
+            simpleCollectors.put(new Dpid(sw.getId()), fsc);
+        }
     }
 
     private void stopCollectors() {
-        collectors.values().forEach(FlowStatsCollector::stop);
-        collectors.clear();
+        if (adaptiveFlowSampling) {
+            // NewAdaptiveFlowStatsCollector Destructor
+            afsCollectors.values().forEach(NewAdaptiveFlowStatsCollector::stop);
+            afsCollectors.clear();
+        } else {
+            simpleCollectors.values().forEach(FlowStatsCollector::stop);
+            simpleCollectors.clear();
+        }
     }
 
     private void adjustRate() {
         DefaultLoad.setPollInterval(flowPollFrequency);
-        collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
+
+        if (adaptiveFlowSampling) {
+            // NewAdaptiveFlowStatsCollector calAndPollInterval
+            afsCollectors.values().forEach(fsc -> fsc.adjustCalAndPollInterval(flowPollFrequency));
+        } else {
+            simpleCollectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
+        }
     }
 
     @Override
@@ -202,8 +246,9 @@
     }
 
     private void applyRule(FlowRule flowRule) {
-        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
-                                                                   .uri()));
+        Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
+        OpenFlowSwitch sw = controller.getSwitch(dpid);
+
         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
         if (hasPayload(flowRuleExtPayLoad)) {
             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -212,6 +257,11 @@
         }
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                                           Optional.empty()).buildFlowAdd());
+
+        if (adaptiveFlowSampling) {
+            // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+            afsCollectors.get(dpid).addWithFlowRule(flowRule);
+        }
     }
 
     @Override
@@ -222,8 +272,9 @@
     }
 
     private void removeRule(FlowRule flowRule) {
-        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
-                                                                   .uri()));
+        Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
+        OpenFlowSwitch sw = controller.getSwitch(dpid);
+
         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
         if (hasPayload(flowRuleExtPayLoad)) {
             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -232,6 +283,11 @@
         }
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                                           Optional.empty()).buildFlowDel());
+
+        if (adaptiveFlowSampling) {
+            // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+            afsCollectors.get(dpid).removeFlows(flowRule);
+        }
     }
 
     @Override
@@ -242,11 +298,12 @@
 
     @Override
     public void executeBatch(FlowRuleBatchOperation batch) {
+        checkNotNull(batch);
 
         pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
 
-        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId()
-                                                                   .uri()));
+        Dpid dpid = Dpid.dpid(batch.deviceId().uri());
+        OpenFlowSwitch sw = controller.getSwitch(dpid);
         OFFlowMod mod;
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             // flow is the third party privacy flow
@@ -262,16 +319,32 @@
             switch (fbe.operator()) {
                 case ADD:
                     mod = builder.buildFlowAdd();
+
+                    if (adaptiveFlowSampling) {
+                        // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                        afsCollectors.get(dpid).addWithFlowRule(fbe.target());
+                    }
                     break;
                 case REMOVE:
                     mod = builder.buildFlowDel();
+
+                    if (adaptiveFlowSampling) {
+                        // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                        afsCollectors.get(dpid).removeFlows(fbe.target());
+                    }
                     break;
                 case MODIFY:
                     mod = builder.buildFlowMod();
+
+                    if (adaptiveFlowSampling) {
+                        // Add or Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                        // afsCollectors.get(dpid).addWithFlowRule(fbe.target()); //check if add is good or not
+                        afsCollectors.get(dpid).addOrUpdateFlows((FlowEntry) fbe.target());
+                    }
                     break;
                 default:
                     log.error("Unsupported batch operation {}; skipping flowmod {}",
-                              fbe.operator(), fbe);
+                            fbe.operator(), fbe);
                     continue;
             }
             sw.sendMsg(mod);
@@ -292,14 +365,24 @@
 
         @Override
         public void switchAdded(Dpid dpid) {
+
+            OpenFlowSwitch sw = controller.getSwitch(dpid);
+
             createCollector(controller.getSwitch(dpid));
         }
 
         @Override
         public void switchRemoved(Dpid dpid) {
-            FlowStatsCollector collector = collectors.remove(dpid);
-            if (collector != null) {
-                collector.stop();
+            if (adaptiveFlowSampling) {
+                NewAdaptiveFlowStatsCollector collector = afsCollectors.remove(dpid);
+                if (collector != null) {
+                    collector.stop();
+                }
+            } else {
+                FlowStatsCollector collector = simpleCollectors.remove(dpid);
+                if (collector != null) {
+                    collector.stop();
+                }
             }
         }
 
@@ -321,6 +404,11 @@
 
                     FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
                     providerService.flowRemoved(fr);
+
+                    if (adaptiveFlowSampling) {
+                        // Removed TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                        afsCollectors.get(dpid).flowRemoved(fr);
+                    }
                     break;
                 case STATS_REPLY:
                     if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
@@ -370,11 +458,10 @@
                                               + " tell us which one.");
                         }
                     }
-                    break;
+
                 default:
                     log.debug("Unhandled message type: {}", msg.getType());
             }
-
         }
 
         @Override
@@ -392,7 +479,38 @@
                     .map(entry -> new FlowEntryBuilder(dpid, entry).build())
                     .collect(Collectors.toList());
 
-            providerService.pushFlowMetrics(did, flowEntries);
+            if (adaptiveFlowSampling)  {
+                NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
+
+                synchronized (afsc) {
+                    if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
+                        log.debug("OpenFlowRuleProvider:pushFlowMetrics, flowMissingXid={}, "
+                                        + "OFFlowStatsReply Xid={}, for {}",
+                                afsc.getFlowMissingXid(), replies.getXid(), dpid);
+                    }
+
+                    // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
+                    if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
+                        if (afsc.getFlowMissingXid() == replies.getXid()) {
+                            // call entire flow stats update with flowMissing synchronization.
+                            // used existing pushFlowMetrics
+                            providerService.pushFlowMetrics(did, flowEntries);
+                        }
+                        // reset flowMissingXid to NO_FLOW_MISSING_XID
+                        afsc.setFlowMissingXid(NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID);
+
+                    } else {
+                        // call individual flow stats update
+                        providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
+                    }
+
+                    // Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                    afsc.pushFlowMetrics(flowEntries);
+                }
+            } else {
+                // call existing entire flow stats update with flowMissing synchronization
+                providerService.pushFlowMetrics(did, flowEntries);
+            }
         }
     }