OpenFlowRuleProvider is now configurable with respect to flowPollFrequency.
Change-Id: I3a559a9cd65df1ae56d80017696452788fc08d91
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
index 69e4235..ac2895e 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
@@ -23,8 +23,6 @@
*/
public interface FlowRuleProvider extends Provider {
- static final int POLL_INTERVAL = 10;
-
/**
* Instructs the provider to apply the specified flow rules to their
* respective devices.
diff --git a/core/api/src/main/java/org/onosproject/net/statistic/DefaultLoad.java b/core/api/src/main/java/org/onosproject/net/statistic/DefaultLoad.java
index 908e7eb..97d3fe1 100644
--- a/core/api/src/main/java/org/onosproject/net/statistic/DefaultLoad.java
+++ b/core/api/src/main/java/org/onosproject/net/statistic/DefaultLoad.java
@@ -16,7 +16,6 @@
package org.onosproject.net.statistic;
import com.google.common.base.MoreObjects;
-import org.onosproject.net.flow.FlowRuleProvider;
/**
* Implementation of a load.
@@ -29,6 +28,11 @@
private final long time;
/**
+ * Indicates the flow statistics poll interval in seconds.
+ */
+ private static int pollInterval = 10;
+
+ /**
* Creates an invalid load.
*/
public DefaultLoad() {
@@ -50,9 +54,19 @@
this.isValid = true;
}
+ /**
+ * Sets the poll interval in seconds. Used solely for the purpose of
+ * computing the load.
+ *
+ * @param newPollInterval poll interval duration in seconds
+ */
+ public static void setPollInterval(int newPollInterval) {
+ pollInterval = newPollInterval;
+ }
+
@Override
public long rate() {
- return (current - previous) / FlowRuleProvider.POLL_INTERVAL;
+ return (current - previous) / pollInterval;
}
@Override
diff --git a/providers/openflow/flow/pom.xml b/providers/openflow/flow/pom.xml
index 5981b7e..6cd60eb 100644
--- a/providers/openflow/flow/pom.xml
+++ b/providers/openflow/flow/pom.xml
@@ -31,4 +31,10 @@
<description>ONOS OpenFlow protocol flow provider</description>
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
+ </dependencies>
</project>
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index 62c1b3f..c4c81af 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -15,87 +15,86 @@
*/
package org.onosproject.provider.of.flow.impl;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.concurrent.TimeUnit;
-
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
+import org.onlab.util.SharedExecutors;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
-import org.onlab.util.Timer;
import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
import org.projectfloodlight.openflow.types.OFPort;
import org.projectfloodlight.openflow.types.TableId;
import org.slf4j.Logger;
-public class FlowStatsCollector implements TimerTask {
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Collects flow statistics for the specified switch.
+ */
+class FlowStatsCollector {
private final Logger log = getLogger(getClass());
- private final HashedWheelTimer timer = Timer.getTimer();
+ public static final int SECONDS = 1000;
+
private final OpenFlowSwitch sw;
- private final int refreshInterval;
+ private Timer timer;
+ private TimerTask task;
- private Timeout timeout;
+ private int pollInterval;
- private boolean stopTimer = false;;
-
- public FlowStatsCollector(OpenFlowSwitch sw, int refreshInterval) {
+ /**
+ * Creates a new collector for the given switch and poll frequency.
+ *
+ * @param timer timer to use for scheduling
+ * @param sw switch to pull
+ * @param pollInterval poll frequency in seconds
+ */
+ FlowStatsCollector(Timer timer, OpenFlowSwitch sw, int pollInterval) {
+ this.timer = timer;
this.sw = sw;
- this.refreshInterval = refreshInterval;
+ this.pollInterval = pollInterval;
}
- @Override
- public void run(Timeout timeout) throws Exception {
- log.trace("Collecting stats for {}", this.sw.getStringId());
+ /**
+ * Adjusts poll frequency.
+ *
+ * @param pollInterval poll frequency in seconds
+ */
+ synchronized void adjustPollInterval(int pollInterval) {
+ this.pollInterval = pollInterval;
+ task.cancel();
+ task = new InternalTimerTask();
+ timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000);
+ }
- sendFlowStatistics();
-
- if (!this.stopTimer) {
- log.trace("Scheduling stats collection in {} seconds for {}",
- this.refreshInterval, this.sw.getStringId());
- timeout.getTimer().newTimeout(this, refreshInterval,
- TimeUnit.SECONDS);
+ private class InternalTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ if (sw.getRole() == RoleState.MASTER) {
+ log.trace("Collecting stats for {}", sw.getStringId());
+ OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
+ .setMatch(sw.factory().matchWildcardAll())
+ .setTableId(TableId.ALL)
+ .setOutPort(OFPort.NO_MASK)
+ .build();
+ sw.sendMsg(request);
+ }
}
-
-
}
- private void sendFlowStatistics() {
- if (log.isTraceEnabled()) {
- log.trace("sendFlowStatistics {}:{}", sw.getStringId(), sw.getRole());
- }
- if (sw.getRole() != RoleState.MASTER) {
- // Switch not master.
- return;
- }
- OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
- .setMatch(sw.factory().matchWildcardAll())
- .setTableId(TableId.ALL)
- .setOutPort(OFPort.NO_MASK)
- .build();
-
- this.sw.sendMsg(request);
-
+ public synchronized void start() {
+ // Initially start polling quickly. Then drop down to configured value
+ log.debug("Starting Stats collection thread for {}", sw.getStringId());
+ task = new InternalTimerTask();
+ SharedExecutors.getTimer().scheduleAtFixedRate(task, 1 * SECONDS,
+ pollInterval * SECONDS);
}
- public void start() {
-
- /*
- * Initially start polling quickly. Then drop down to configured value
- */
- log.info("Starting Stats collection thread for {}",
- this.sw.getStringId());
- timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
- }
-
- public void stop() {
- log.info("Stopping Stats collection thread for {}",
- this.sw.getStringId());
- this.stopTimer = true;
- timeout.cancel();
+ public synchronized void stop() {
+ log.debug("Stopping Stats collection thread for {}", sw.getStringId());
+ task.cancel();
+ task = null;
}
}
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 2621f31..08c87e1 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -15,21 +15,20 @@
*/
package org.onosproject.provider.of.flow.impl;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.CompletedBatchOperation;
@@ -43,6 +42,7 @@
import org.onosproject.net.flow.FlowRuleProviderService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.statistic.DefaultLoad;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowController;
import org.onosproject.openflow.controller.OpenFlowEventListener;
@@ -50,6 +50,7 @@
import org.onosproject.openflow.controller.OpenFlowSwitchListener;
import org.onosproject.openflow.controller.RoleState;
import org.onosproject.openflow.controller.ThirdPartyMessage;
+import org.osgi.service.component.ComponentContext;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFErrorType;
@@ -63,12 +64,19 @@
import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
import org.slf4j.Logger;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Provider which uses an OpenFlow controller to detect network end-station
@@ -86,12 +94,21 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenFlowController controller;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService cfgService;
+
+ private static final int DEFAULT_POLL_FREQUENCY = 10;
+ @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
+ label = "Frequency (in seconds) for polling flow statistics")
+ private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
+
private FlowRuleProviderService providerService;
private final InternalFlowProvider listener = new InternalFlowProvider();
private Cache<Long, InternalCacheEntry> pendingBatches;
+ private final Timer timer = new Timer("onos-openflow-collector");
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
/**
@@ -102,42 +119,79 @@
}
@Activate
- public void activate() {
+ public void activate(ComponentContext context) {
+ cfgService.registerProperties(getClass());
providerService = providerRegistry.register(this);
controller.addListener(listener);
controller.addEventListener(listener);
- pendingBatches = CacheBuilder
- .newBuilder()
- .expireAfterWrite(10, TimeUnit.SECONDS)
- .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
- if (notification.getCause() == RemovalCause.EXPIRED) {
- providerService
- .batchOperationCompleted(notification
- .getKey(),
- notification
- .getValue()
- .failedCompletion());
- }
- }).build();
-
- for (OpenFlowSwitch sw : controller.getSwitches()) {
- FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
- fsc.start();
- collectors.put(new Dpid(sw.getId()), fsc);
- }
+ pendingBatches = createBatchCache();
+ createCollectors();
log.info("Started");
}
@Deactivate
- public void deactivate() {
+ public void deactivate(ComponentContext context) {
+ cfgService.unregisterProperties(getClass(), false);
+ stopCollectors();
providerRegistry.unregister(this);
providerService = null;
log.info("Stopped");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+ int newFlowPollFrequency;
+ try {
+ String s = get(properties, "flowPollFrequency");
+ newFlowPollFrequency = isNullOrEmpty(s) ? flowPollFrequency : Integer.parseInt(s.trim());
+
+ } catch (NumberFormatException | ClassCastException e) {
+ newFlowPollFrequency = flowPollFrequency;
+ }
+
+ if (newFlowPollFrequency != flowPollFrequency) {
+ flowPollFrequency = newFlowPollFrequency;
+ adjustRate();
+ }
+
+ log.info("Settings: flowPollFrequency={}", flowPollFrequency);
+ }
+
+ private Cache<Long, InternalCacheEntry> createBatchCache() {
+ return CacheBuilder.newBuilder()
+ .expireAfterWrite(10, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ providerService.batchOperationCompleted(notification.getKey(),
+ notification.getValue().failedCompletion());
+ }
+ }).build();
+ }
+
+ private void createCollectors() {
+ controller.getSwitches().forEach(this::createCollector);
+ }
+
+ private void createCollector(OpenFlowSwitch sw) {
+ FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
+ fsc.start();
+ collectors.put(new Dpid(sw.getId()), fsc);
+ }
+
+ private void stopCollectors() {
+ collectors.values().forEach(FlowStatsCollector::stop);
+ collectors.clear();
+ }
+
+ private void adjustRate() {
+ DefaultLoad.setPollInterval(flowPollFrequency);
+ collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
+ }
+
@Override
public void applyFlowRule(FlowRule... flowRules) {
for (FlowRule flowRule : flowRules) {
@@ -147,7 +201,7 @@
private void applyRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
- .uri()));
+ .uri()));
FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
if (hasPayload(flowRuleExtPayLoad)) {
OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -167,7 +221,7 @@
private void removeRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
- .uri()));
+ .uri()));
FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
if (hasPayload(flowRuleExtPayLoad)) {
OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -190,7 +244,7 @@
pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId()
- .uri()));
+ .uri()));
OFFlowMod mod;
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
// flow is the third party privacy flow
@@ -204,19 +258,19 @@
FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw
.factory(), Optional.of(batch.id()));
switch (fbe.operator()) {
- case ADD:
- mod = builder.buildFlowAdd();
- break;
- case REMOVE:
- mod = builder.buildFlowDel();
- break;
- case MODIFY:
- mod = builder.buildFlowMod();
- break;
- default:
- log.error("Unsupported batch operation {}; skipping flowmod {}",
- fbe.operator(), fbe);
- continue;
+ case ADD:
+ mod = builder.buildFlowAdd();
+ break;
+ case REMOVE:
+ mod = builder.buildFlowDel();
+ break;
+ case MODIFY:
+ mod = builder.buildFlowMod();
+ break;
+ default:
+ log.error("Unsupported batch operation {}; skipping flowmod {}",
+ fbe.operator(), fbe);
+ continue;
}
sw.sendMsg(mod);
}
@@ -236,12 +290,7 @@
@Override
public void switchAdded(Dpid dpid) {
- FlowStatsCollector fsc = new FlowStatsCollector(
- controller
- .getSwitch(dpid),
- POLL_INTERVAL);
- fsc.start();
- collectors.put(dpid, fsc);
+ createCollector(controller.getSwitch(dpid));
}
@Override
@@ -265,64 +314,64 @@
public void handleMessage(Dpid dpid, OFMessage msg) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
switch (msg.getType()) {
- case FLOW_REMOVED:
- OFFlowRemoved removed = (OFFlowRemoved) msg;
+ case FLOW_REMOVED:
+ OFFlowRemoved removed = (OFFlowRemoved) msg;
- FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
- providerService.flowRemoved(fr);
- break;
- case STATS_REPLY:
- if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
- pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
- }
- break;
- case BARRIER_REPLY:
- try {
- InternalCacheEntry entry = pendingBatches.getIfPresent(msg
- .getXid());
- if (entry != null) {
- providerService
- .batchOperationCompleted(msg.getXid(),
- entry.completed());
- } else {
- log.warn("Received unknown Barrier Reply: {}",
- msg.getXid());
+ FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
+ providerService.flowRemoved(fr);
+ break;
+ case STATS_REPLY:
+ if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
+ pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
}
- } finally {
- pendingBatches.invalidate(msg.getXid());
- }
- break;
- case ERROR:
- log.warn("received Error message {} from {}", msg, dpid);
-
- OFErrorMsg error = (OFErrorMsg) msg;
- if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
- OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
- if (fmFailed.getData().getParsedMessage().isPresent()) {
- OFMessage m = fmFailed.getData().getParsedMessage()
- .get();
- OFFlowMod fm = (OFFlowMod) m;
- InternalCacheEntry entry = pendingBatches
- .getIfPresent(msg.getXid());
+ break;
+ case BARRIER_REPLY:
+ try {
+ InternalCacheEntry entry = pendingBatches.getIfPresent(msg
+ .getXid());
if (entry != null) {
- entry.appendFailure(new FlowEntryBuilder(dpid, fm)
- .build());
+ providerService
+ .batchOperationCompleted(msg.getXid(),
+ entry.completed());
} else {
- log.error("No matching batch for this error: {}",
- error);
+ log.warn("Received unknown Barrier Reply: {}",
+ msg.getXid());
+ }
+ } finally {
+ pendingBatches.invalidate(msg.getXid());
+ }
+ break;
+ case ERROR:
+ log.warn("received Error message {} from {}", msg, dpid);
+
+ OFErrorMsg error = (OFErrorMsg) msg;
+ if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
+ OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
+ if (fmFailed.getData().getParsedMessage().isPresent()) {
+ OFMessage m = fmFailed.getData().getParsedMessage()
+ .get();
+ OFFlowMod fm = (OFFlowMod) m;
+ InternalCacheEntry entry = pendingBatches
+ .getIfPresent(msg.getXid());
+ if (entry != null) {
+ entry.appendFailure(new FlowEntryBuilder(dpid, fm)
+ .build());
+ } else {
+ log.error("No matching batch for this error: {}",
+ error);
+ }
+ } else {
+ // FIXME: Potentially add flowtracking to avoid this
+ // message.
+ log.error("Flow installation failed but switch didn't"
+ + " tell us which one.");
}
} else {
- // FIXME: Potentially add flowtracking to avoid this
- // message.
- log.error("Flow installation failed but switch didn't"
- + " tell us which one.");
+ log.warn("Received error {}", error);
}
- } else {
- log.warn("Received error {}", error);
- }
- default:
- log.debug("Unhandled message type: {}", msg.getType());
+ default:
+ log.debug("Unhandled message type: {}", msg.getType());
}
}
@@ -343,15 +392,13 @@
.collect(Collectors.toList());
providerService.pushFlowMetrics(did, flowEntries);
-
}
-
}
/**
* The internal cache entry holding the original request as well as
* accumulating the any failures along the way.
- *
+ * <p/>
* If this entry is evicted from the cache then the entire operation is
* considered failed. Otherwise, only the failures reported by the device
* will be propagated up.
@@ -395,12 +442,11 @@
*/
public CompletedBatchOperation completed() {
return new CompletedBatchOperation(
- failures.isEmpty(),
- Collections
- .unmodifiableSet(failures),
- operation.deviceId());
+ failures.isEmpty(),
+ Collections
+ .unmodifiableSet(failures),
+ operation.deviceId());
}
-
}
}