OpenFlowRuleProvider is now configurable with respect to flowPollFrequency.

Change-Id: I3a559a9cd65df1ae56d80017696452788fc08d91
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
index 69e4235..ac2895e 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
@@ -23,8 +23,6 @@
  */
 public interface FlowRuleProvider extends Provider {
 
-    static final int POLL_INTERVAL = 10;
-
     /**
      * Instructs the provider to apply the specified flow rules to their
      * respective devices.
diff --git a/core/api/src/main/java/org/onosproject/net/statistic/DefaultLoad.java b/core/api/src/main/java/org/onosproject/net/statistic/DefaultLoad.java
index 908e7eb..97d3fe1 100644
--- a/core/api/src/main/java/org/onosproject/net/statistic/DefaultLoad.java
+++ b/core/api/src/main/java/org/onosproject/net/statistic/DefaultLoad.java
@@ -16,7 +16,6 @@
 package org.onosproject.net.statistic;
 
 import com.google.common.base.MoreObjects;
-import org.onosproject.net.flow.FlowRuleProvider;
 
 /**
  * Implementation of a load.
@@ -29,6 +28,11 @@
     private final long time;
 
     /**
+     * Indicates the flow statistics poll interval in seconds.
+     */
+    private static int pollInterval = 10;
+
+     /**
      * Creates an invalid load.
      */
     public DefaultLoad() {
@@ -50,9 +54,19 @@
         this.isValid = true;
     }
 
+    /**
+     * Sets the poll interval in seconds. Used solely for the purpose of
+     * computing the load.
+     *
+     * @param newPollInterval poll interval duration in seconds
+     */
+    public static void setPollInterval(int newPollInterval) {
+        pollInterval = newPollInterval;
+    }
+
     @Override
     public long rate() {
-        return (current - previous) / FlowRuleProvider.POLL_INTERVAL;
+        return (current - previous) / pollInterval;
     }
 
     @Override
diff --git a/providers/openflow/flow/pom.xml b/providers/openflow/flow/pom.xml
index 5981b7e..6cd60eb 100644
--- a/providers/openflow/flow/pom.xml
+++ b/providers/openflow/flow/pom.xml
@@ -31,4 +31,10 @@
 
     <description>ONOS OpenFlow protocol flow provider</description>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index 62c1b3f..c4c81af 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -15,87 +15,86 @@
  */
 package org.onosproject.provider.of.flow.impl;
 
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.RoleState;
-import org.onlab.util.Timer;
 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
 import org.projectfloodlight.openflow.types.OFPort;
 import org.projectfloodlight.openflow.types.TableId;
 import org.slf4j.Logger;
 
-public class FlowStatsCollector implements TimerTask {
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Collects flow statistics for the specified switch.
+ */
+class FlowStatsCollector {
 
     private final Logger log = getLogger(getClass());
 
-    private final HashedWheelTimer timer = Timer.getTimer();
+    public static final int SECONDS = 1000;
+
     private final OpenFlowSwitch sw;
-    private final int refreshInterval;
+    private Timer timer;
+    private TimerTask task;
 
-    private Timeout timeout;
+    private int pollInterval;
 
-    private boolean stopTimer = false;;
-
-    public FlowStatsCollector(OpenFlowSwitch sw, int refreshInterval) {
+    /**
+     * Creates a new collector for the given switch and poll frequency.
+     *
+     * @param timer        timer to use for scheduling
+     * @param sw           switch to pull
+     * @param pollInterval poll frequency in seconds
+     */
+    FlowStatsCollector(Timer timer, OpenFlowSwitch sw, int pollInterval) {
+        this.timer = timer;
         this.sw = sw;
-        this.refreshInterval = refreshInterval;
+        this.pollInterval = pollInterval;
     }
 
-    @Override
-    public void run(Timeout timeout) throws Exception {
-        log.trace("Collecting stats for {}", this.sw.getStringId());
+    /**
+     * Adjusts poll frequency.
+     *
+     * @param pollInterval poll frequency in seconds
+     */
+    synchronized void adjustPollInterval(int pollInterval) {
+        this.pollInterval = pollInterval;
+        task.cancel();
+        task = new InternalTimerTask();
+        timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000);
+    }
 
-        sendFlowStatistics();
-
-        if (!this.stopTimer) {
-            log.trace("Scheduling stats collection in {} seconds for {}",
-                    this.refreshInterval, this.sw.getStringId());
-            timeout.getTimer().newTimeout(this, refreshInterval,
-                    TimeUnit.SECONDS);
+    private class InternalTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            if (sw.getRole() == RoleState.MASTER) {
+                log.trace("Collecting stats for {}", sw.getStringId());
+                OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
+                        .setMatch(sw.factory().matchWildcardAll())
+                        .setTableId(TableId.ALL)
+                        .setOutPort(OFPort.NO_MASK)
+                        .build();
+                sw.sendMsg(request);
+            }
         }
-
-
     }
 
-    private void sendFlowStatistics() {
-        if (log.isTraceEnabled()) {
-            log.trace("sendFlowStatistics {}:{}", sw.getStringId(), sw.getRole());
-        }
-        if (sw.getRole() != RoleState.MASTER) {
-            // Switch not master.
-            return;
-        }
-        OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
-                .setMatch(sw.factory().matchWildcardAll())
-                .setTableId(TableId.ALL)
-                .setOutPort(OFPort.NO_MASK)
-                .build();
-
-        this.sw.sendMsg(request);
-
+    public synchronized void start() {
+        // Initially start polling quickly. Then drop down to configured value
+        log.debug("Starting Stats collection thread for {}", sw.getStringId());
+        task = new InternalTimerTask();
+        SharedExecutors.getTimer().scheduleAtFixedRate(task, 1 * SECONDS,
+                                                       pollInterval * SECONDS);
     }
 
-    public void start() {
-
-        /*
-         * Initially start polling quickly. Then drop down to configured value
-         */
-        log.info("Starting Stats collection thread for {}",
-                this.sw.getStringId());
-        timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
-    }
-
-    public void stop() {
-        log.info("Stopping Stats collection thread for {}",
-                this.sw.getStringId());
-        this.stopTimer = true;
-        timeout.cancel();
+    public synchronized void stop() {
+        log.debug("Stopping Stats collection thread for {}", sw.getStringId());
+        task.cancel();
+        task = null;
     }
 
 }
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 2621f31..08c87e1 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,21 +15,20 @@
  */
 package org.onosproject.provider.of.flow.impl;
 
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.flow.CompletedBatchOperation;
@@ -43,6 +42,7 @@
 import org.onosproject.net.flow.FlowRuleProviderService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.statistic.DefaultLoad;
 import org.onosproject.openflow.controller.Dpid;
 import org.onosproject.openflow.controller.OpenFlowController;
 import org.onosproject.openflow.controller.OpenFlowEventListener;
@@ -50,6 +50,7 @@
 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
 import org.onosproject.openflow.controller.RoleState;
 import org.onosproject.openflow.controller.ThirdPartyMessage;
+import org.osgi.service.component.ComponentContext;
 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
 import org.projectfloodlight.openflow.protocol.OFErrorType;
@@ -63,12 +64,19 @@
 import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
 import org.slf4j.Logger;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Provider which uses an OpenFlow controller to detect network end-station
@@ -86,12 +94,21 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenFlowController controller;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService cfgService;
+
+    private static final int DEFAULT_POLL_FREQUENCY = 10;
+    @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
+            label = "Frequency (in seconds) for polling flow statistics")
+    private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
+
     private FlowRuleProviderService providerService;
 
     private final InternalFlowProvider listener = new InternalFlowProvider();
 
     private Cache<Long, InternalCacheEntry> pendingBatches;
 
+    private final Timer timer = new Timer("onos-openflow-collector");
     private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
 
     /**
@@ -102,42 +119,79 @@
     }
 
     @Activate
-    public void activate() {
+    public void activate(ComponentContext context) {
+        cfgService.registerProperties(getClass());
         providerService = providerRegistry.register(this);
         controller.addListener(listener);
         controller.addEventListener(listener);
 
-        pendingBatches = CacheBuilder
-                .newBuilder()
-                .expireAfterWrite(10, TimeUnit.SECONDS)
-                .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
-                                     if (notification.getCause() == RemovalCause.EXPIRED) {
-                                         providerService
-                                                 .batchOperationCompleted(notification
-                                                                                  .getKey(),
-                                                                          notification
-                                                                                  .getValue()
-                                                                                  .failedCompletion());
-                                     }
-                                 }).build();
-
-        for (OpenFlowSwitch sw : controller.getSwitches()) {
-            FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
-            fsc.start();
-            collectors.put(new Dpid(sw.getId()), fsc);
-        }
+        pendingBatches = createBatchCache();
+        createCollectors();
 
         log.info("Started");
     }
 
     @Deactivate
-    public void deactivate() {
+    public void deactivate(ComponentContext context) {
+        cfgService.unregisterProperties(getClass(), false);
+        stopCollectors();
         providerRegistry.unregister(this);
         providerService = null;
 
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+        int newFlowPollFrequency;
+        try {
+            String s = get(properties, "flowPollFrequency");
+            newFlowPollFrequency = isNullOrEmpty(s) ? flowPollFrequency : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException | ClassCastException e) {
+            newFlowPollFrequency = flowPollFrequency;
+        }
+
+        if (newFlowPollFrequency != flowPollFrequency) {
+            flowPollFrequency = newFlowPollFrequency;
+            adjustRate();
+        }
+
+        log.info("Settings: flowPollFrequency={}", flowPollFrequency);
+    }
+
+    private Cache<Long, InternalCacheEntry> createBatchCache() {
+        return CacheBuilder.newBuilder()
+                .expireAfterWrite(10, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        providerService.batchOperationCompleted(notification.getKey(),
+                                                                notification.getValue().failedCompletion());
+                    }
+                }).build();
+    }
+
+    private void createCollectors() {
+        controller.getSwitches().forEach(this::createCollector);
+    }
+
+    private void createCollector(OpenFlowSwitch sw) {
+        FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
+        fsc.start();
+        collectors.put(new Dpid(sw.getId()), fsc);
+    }
+
+    private void stopCollectors() {
+        collectors.values().forEach(FlowStatsCollector::stop);
+        collectors.clear();
+    }
+
+    private void adjustRate() {
+        DefaultLoad.setPollInterval(flowPollFrequency);
+        collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
+    }
+
     @Override
     public void applyFlowRule(FlowRule... flowRules) {
         for (FlowRule flowRule : flowRules) {
@@ -147,7 +201,7 @@
 
     private void applyRule(FlowRule flowRule) {
         OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
-                .uri()));
+                                                                   .uri()));
         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
         if (hasPayload(flowRuleExtPayLoad)) {
             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -167,7 +221,7 @@
 
     private void removeRule(FlowRule flowRule) {
         OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
-                .uri()));
+                                                                   .uri()));
         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
         if (hasPayload(flowRuleExtPayLoad)) {
             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -190,7 +244,7 @@
         pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
 
         OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId()
-                .uri()));
+                                                                   .uri()));
         OFFlowMod mod;
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             // flow is the third party privacy flow
@@ -204,19 +258,19 @@
             FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw
                     .factory(), Optional.of(batch.id()));
             switch (fbe.operator()) {
-            case ADD:
-                mod = builder.buildFlowAdd();
-                break;
-            case REMOVE:
-                mod = builder.buildFlowDel();
-                break;
-            case MODIFY:
-                mod = builder.buildFlowMod();
-                break;
-            default:
-                log.error("Unsupported batch operation {}; skipping flowmod {}",
-                          fbe.operator(), fbe);
-                continue;
+                case ADD:
+                    mod = builder.buildFlowAdd();
+                    break;
+                case REMOVE:
+                    mod = builder.buildFlowDel();
+                    break;
+                case MODIFY:
+                    mod = builder.buildFlowMod();
+                    break;
+                default:
+                    log.error("Unsupported batch operation {}; skipping flowmod {}",
+                              fbe.operator(), fbe);
+                    continue;
             }
             sw.sendMsg(mod);
         }
@@ -236,12 +290,7 @@
 
         @Override
         public void switchAdded(Dpid dpid) {
-            FlowStatsCollector fsc = new FlowStatsCollector(
-                                                            controller
-                                                                    .getSwitch(dpid),
-                                                            POLL_INTERVAL);
-            fsc.start();
-            collectors.put(dpid, fsc);
+            createCollector(controller.getSwitch(dpid));
         }
 
         @Override
@@ -265,64 +314,64 @@
         public void handleMessage(Dpid dpid, OFMessage msg) {
             OpenFlowSwitch sw = controller.getSwitch(dpid);
             switch (msg.getType()) {
-            case FLOW_REMOVED:
-                OFFlowRemoved removed = (OFFlowRemoved) msg;
+                case FLOW_REMOVED:
+                    OFFlowRemoved removed = (OFFlowRemoved) msg;
 
-                FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
-                providerService.flowRemoved(fr);
-                break;
-            case STATS_REPLY:
-                if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
-                    pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
-                }
-                break;
-            case BARRIER_REPLY:
-                try {
-                    InternalCacheEntry entry = pendingBatches.getIfPresent(msg
-                            .getXid());
-                    if (entry != null) {
-                        providerService
-                                .batchOperationCompleted(msg.getXid(),
-                                                         entry.completed());
-                    } else {
-                        log.warn("Received unknown Barrier Reply: {}",
-                                 msg.getXid());
+                    FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
+                    providerService.flowRemoved(fr);
+                    break;
+                case STATS_REPLY:
+                    if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
+                        pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
                     }
-                } finally {
-                    pendingBatches.invalidate(msg.getXid());
-                }
-                break;
-            case ERROR:
-                log.warn("received Error message {} from {}", msg, dpid);
-
-                OFErrorMsg error = (OFErrorMsg) msg;
-                if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
-                    OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
-                    if (fmFailed.getData().getParsedMessage().isPresent()) {
-                        OFMessage m = fmFailed.getData().getParsedMessage()
-                                .get();
-                        OFFlowMod fm = (OFFlowMod) m;
-                        InternalCacheEntry entry = pendingBatches
-                                .getIfPresent(msg.getXid());
+                    break;
+                case BARRIER_REPLY:
+                    try {
+                        InternalCacheEntry entry = pendingBatches.getIfPresent(msg
+                                                                                       .getXid());
                         if (entry != null) {
-                            entry.appendFailure(new FlowEntryBuilder(dpid, fm)
-                                    .build());
+                            providerService
+                                    .batchOperationCompleted(msg.getXid(),
+                                                             entry.completed());
                         } else {
-                            log.error("No matching batch for this error: {}",
-                                      error);
+                            log.warn("Received unknown Barrier Reply: {}",
+                                     msg.getXid());
+                        }
+                    } finally {
+                        pendingBatches.invalidate(msg.getXid());
+                    }
+                    break;
+                case ERROR:
+                    log.warn("received Error message {} from {}", msg, dpid);
+
+                    OFErrorMsg error = (OFErrorMsg) msg;
+                    if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
+                        OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
+                        if (fmFailed.getData().getParsedMessage().isPresent()) {
+                            OFMessage m = fmFailed.getData().getParsedMessage()
+                                    .get();
+                            OFFlowMod fm = (OFFlowMod) m;
+                            InternalCacheEntry entry = pendingBatches
+                                    .getIfPresent(msg.getXid());
+                            if (entry != null) {
+                                entry.appendFailure(new FlowEntryBuilder(dpid, fm)
+                                                            .build());
+                            } else {
+                                log.error("No matching batch for this error: {}",
+                                          error);
+                            }
+                        } else {
+                            // FIXME: Potentially add flowtracking to avoid this
+                            // message.
+                            log.error("Flow installation failed but switch didn't"
+                                              + " tell us which one.");
                         }
                     } else {
-                        // FIXME: Potentially add flowtracking to avoid this
-                        // message.
-                        log.error("Flow installation failed but switch didn't"
-                                + " tell us which one.");
+                        log.warn("Received error {}", error);
                     }
-                } else {
-                    log.warn("Received error {}", error);
-                }
 
-            default:
-                log.debug("Unhandled message type: {}", msg.getType());
+                default:
+                    log.debug("Unhandled message type: {}", msg.getType());
             }
 
         }
@@ -343,15 +392,13 @@
                     .collect(Collectors.toList());
 
             providerService.pushFlowMetrics(did, flowEntries);
-
         }
-
     }
 
     /**
      * The internal cache entry holding the original request as well as
      * accumulating the any failures along the way.
-     *
+     * <p/>
      * If this entry is evicted from the cache then the entire operation is
      * considered failed. Otherwise, only the failures reported by the device
      * will be propagated up.
@@ -395,12 +442,11 @@
          */
         public CompletedBatchOperation completed() {
             return new CompletedBatchOperation(
-                                               failures.isEmpty(),
-                                               Collections
-                                                       .unmodifiableSet(failures),
-                                               operation.deviceId());
+                    failures.isEmpty(),
+                    Collections
+                            .unmodifiableSet(failures),
+                    operation.deviceId());
         }
-
     }
 
 }