blob: c8bf707bd14247730c04d8e3646d909bda10f6dc [file] [log] [blame]
/*
* Copyright 2014-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.provider.of.flow.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.util.SlidingWindowCounter;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
import org.projectfloodlight.openflow.types.OFPort;
import org.projectfloodlight.openflow.types.TableId;
import org.slf4j.Logger;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Collects flow statistics for the specified switch.
*/
class FlowStatsCollector implements SwitchDataCollector {
private final Logger log = getLogger(getClass());
private static final int SECONDS = 1000;
// Number of ticks which defines the pause window.
private static final int PAUSE_WINDOW = 2;
// Number of ticks which defines the high load window
private static final int HIGH_WINDOW = 60;
// Number of ticks which defines the low load window
private static final int LOW_WINDOW = 15;
// Multiplier factor of the load
private static final int LOAD_FACTOR = 2;
// Event/s defining the min load rate
private static final int MIN_LOAD_RATE = 50;
// Event/s defining the max load rate
private static final int MAX_LOAD_RATE = 500;
private final OpenFlowSwitch sw;
private Timer timer;
private TimerTask pauseTask;
private TimerTask pollTask;
private SlidingWindowCounter loadCounter;
// Defines whether the collector is in pause or not for high load
private final AtomicBoolean paused = new AtomicBoolean();
// Defines whether the collector is in waiting or not for a previous stats reply
private final AtomicBoolean waiting = new AtomicBoolean();
private int pollInterval;
/**
* Creates a new collector for the given switch and poll frequency.
*
* @param timer timer to use for scheduling
* @param sw switch to pull
* @param pollInterval poll frequency in seconds
*/
FlowStatsCollector(Timer timer, OpenFlowSwitch sw, int pollInterval) {
this.timer = timer;
this.sw = checkNotNull(sw, "Null switch");
this.pollInterval = pollInterval;
}
/**
* Adjusts poll frequency.
*
* @param pollInterval poll frequency in seconds
*/
synchronized void adjustPollInterval(int pollInterval) {
this.pollInterval = pollInterval;
if (pollTask != null) {
pollTask.cancel();
}
// If we went through start - let's schedule it
if (loadCounter != null) {
pollTask = new PollTimerTask();
timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
}
}
/**
* Resets the collector's event count.
*/
@Override
public synchronized void resetEvents() {
loadCounter.clear();
if (paused.compareAndSet(true, false)) {
resume();
}
// Let's reset also waiting, the reply can be discarded/lost
// during a change of mastership
waiting.set(false);
}
/**
* Records a number of flow events that have occurred.
*
* @param events the number of events that occurred
*/
@Override
public void recordEvents(int events) {
SlidingWindowCounter loadCounter = this.loadCounter;
if (loadCounter != null) {
loadCounter.incrementCount(events);
}
}
/**
* Returns a boolean indicating whether the switch is under high load.
* <p>
* The switch is considered under high load if the average rate over the last two seconds is
* greater than twice the overall rate or 50 flows/sec.
*
* @return indicates whether the switch is under high load
*/
private boolean isHighLoad() {
return loadCounter.getWindowRate(PAUSE_WINDOW)
> max(min(loadCounter.getWindowRate(HIGH_WINDOW) * LOAD_FACTOR, MAX_LOAD_RATE), MIN_LOAD_RATE);
}
/**
* Returns a boolean indicating whether the switch is under low load.
* <p>
* The switch is considered under low load if the average rate over the last 15 seconds is
* less than the overall rate.
*
* @return indicates whether the switch is under low load
*/
private boolean isLowLoad() {
return loadCounter.getWindowRate(LOW_WINDOW) < loadCounter.getWindowRate(HIGH_WINDOW);
}
private class PauseTimerTask extends TimerTask {
@Override
public void run() {
if (isHighLoad()) {
if (paused.compareAndSet(false, true)) {
pause();
}
} else if (isLowLoad()) {
if (paused.compareAndSet(true, false)) {
resume();
}
}
}
}
private class PollTimerTask extends TimerTask {
@Override
public void run() {
// Check whether we are still waiting a previous reply
if (sw.getRole() == RoleState.MASTER && !waiting.get()) {
// Check whether the switch is under high load from this master. This is done here in case a large
// batch was pushed immediately prior to this task running.
if (isHighLoad()) {
log.debug("Skipping stats collection for {} due to high load; rate: {}; overall: {}",
sw.getStringId(),
loadCounter.getWindowRate(PAUSE_WINDOW),
loadCounter.getWindowRate(HIGH_WINDOW));
return;
} else {
log.debug(
"Permitting stats collection for {}; rate: {}; overall: {}",
sw.getStringId(),
loadCounter.getWindowRate(PAUSE_WINDOW),
loadCounter.getWindowRate(HIGH_WINDOW));
}
log.trace("Collecting stats for {}", sw.getStringId());
OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
.setMatch(sw.factory().matchWildcardAll())
.setTableId(TableId.ALL)
.setOutPort(OFPort.NO_MASK)
.build();
sw.sendMsg(request);
// Other flow stats will not be asked
// if we don't see first the reply of this request
waiting.set(true);
}
}
}
public synchronized void start() {
// Initially start polling quickly. Then drop down to configured value
log.debug("Starting Stats collection thread for {}", sw.getStringId());
loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
pauseTask = new PauseTimerTask();
timer.scheduleAtFixedRate(pauseTask, 1 * SECONDS, 1 * SECONDS);
pollTask = new PollTimerTask();
timer.scheduleAtFixedRate(pollTask, 1 * SECONDS, pollInterval * SECONDS);
}
private synchronized void pause() {
if (pollTask != null) {
log.debug("Pausing stats collection for {}; rate: {}; overall: {}",
sw.getStringId(),
loadCounter.getWindowRate(PAUSE_WINDOW),
loadCounter.getWindowRate(HIGH_WINDOW));
pollTask.cancel();
pollTask = null;
}
}
private synchronized void resume() {
log.debug("Resuming stats collection for {}; rate: {}; overall: {}",
sw.getStringId(),
loadCounter.getWindowRate(PAUSE_WINDOW),
loadCounter.getWindowRate(HIGH_WINDOW));
pollTask = new PollTimerTask();
timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
}
public synchronized void stop() {
if (pauseTask != null) {
pauseTask.cancel();
pauseTask = null;
}
if (pollTask != null) {
log.debug("Stopping Stats collection thread for {}", sw.getStringId());
pollTask.cancel();
pollTask = null;
}
if (loadCounter != null) {
loadCounter.destroy();
loadCounter = null;
}
}
public void received() {
waiting.set(false);
}
}