blob: 029d70ae55ce639df0689f2dd9ead7b941940cbd [file] [log] [blame]
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.intentperf;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.onlab.packet.MacAddress;
import org.onlab.util.Counter;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.net.intent.PointToPointIntent;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.intent.IntentEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Application to test sustained intent throughput.
*/
@Component(immediate = true)
public class IntentPerfInstaller {
private final Logger log = getLogger(getClass());
private static final int DEFAULT_NUM_WORKERS = 1;
private static final int DEFAULT_NUM_KEYS = 40_000;
private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1_000; //ms
private static final int DEFAULT_NUM_NEIGHBORS = 0;
private static final int START_DELAY = 5_000; // ms
private static final int REPORT_PERIOD = 5_000; //ms
//FIXME add path length
@Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
label = "Number of keys (i.e. unique intents) to generate per instance")
private int numKeys = DEFAULT_NUM_KEYS;
//TODO implement numWorkers property
// @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
// label = "Number of installer threads per instance")
// private int numWokers = DEFAULT_NUM_WORKERS;
@Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
label = "Goal for cycle period (in ms)")
private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
@Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
label = "Number of neighbors to generate intents for")
private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
@Reference(cardinality = MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = MANDATORY_UNARY)
protected IntentService intentService;
@Reference(cardinality = MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = MANDATORY_UNARY)
protected PartitionService partitionService;
@Reference(cardinality = MANDATORY_UNARY)
protected ComponentConfigService configService;
@Reference(cardinality = MANDATORY_UNARY)
protected IntentPerfCollector sampleCollector;
private ExecutorService workers;
private ApplicationId appId;
private Listener listener;
private boolean stopped;
private Timer reportTimer;
// FIXME this variable isn't shared properly between multiple worker threads
private int lastKey = 0;
private IntentPerfUi perfUi;
@Activate
public void activate() {
// configService.registerProperties(getClass());
String nodeId = clusterService.getLocalNode().ip().toString();
appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
reportTimer = new Timer("onos-intent-perf-reporter");
workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
// disable flow backups for testing
log.info("flow props: {}",
configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
"backupEnabled", "false");
// Schedule delayed start
reportTimer.schedule(new TimerTask() {
@Override
public void run() {
start();
}
}, START_DELAY);
}
@Deactivate
public void deactivate() {
// configService.unregisterProperties(getClass(), false);
stop();
}
//FIXME add modified
private void logConfig(String prefix) {
log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
}
public void start() {
// adjust numNeighbors and generate list of neighbors
numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
// perhaps we want to prime before listening...
// we will need to discard the first few results for priming and warmup
listener = new Listener();
intentService.addListener(listener);
// Schedule reporter task on report period boundary
reportTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
//adjustRates(); // FIXME we currently adjust rates in the cycle thread
listener.report();
}
}, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
// Submit workers
stopped = false;
for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
}
logConfig("Started");
}
public void stop() {
stopped = true;
if (listener != null) {
reportTimer.cancel();
intentService.removeListener(listener);
listener = null;
reportTimer = null;
}
try {
workers.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker", e);
}
log.info("Stopped");
}
private List<NodeId> getNeighbors() {
List<NodeId> nodes = clusterService.getNodes().stream()
.map(ControllerNode::id)
.collect(Collectors.toCollection(ArrayList::new));
// sort neighbors by id
Collections.sort(nodes, (node1, node2) ->
node1.toString().compareTo(node2.toString()));
// rotate the local node to index 0
Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
log.debug("neighbors (raw): {}", nodes); //TODO remove
// generate the sub-list that will contain local node and selected neighbors
nodes = nodes.subList(0, numNeighbors + 1);
log.debug("neighbors: {}", nodes); //TODO remove
return nodes;
}
private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
// choose a random device for which this node is master
List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
//FIXME we currently ignore the path length and always use the same device
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthDst(MacAddress.valueOf(mac)).build();
TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
return new PointToPointIntent(appId, key,
selector, treatment,
ingress, egress,
Collections.emptyList(),
Intent.DEFAULT_INTENT_PRIORITY);
}
/**
* Creates a specified number of intents for testing purposes.
*
* @param numberOfKeys number of intents
* @param pathLength path depth
* @param firstKey first key to attempt
* @return set of intents
*/
private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
List<NodeId> neighbors = getNeighbors();
Multimap<NodeId, Device> devices = ArrayListMultimap.create();
deviceService.getAvailableDevices()
.forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device));
// ensure that we have at least one device per neighbor
neighbors.forEach(node -> checkState(devices.get(node).size() > 0,
"There are no devices for {}", node));
// TODO pull this outside so that createIntent can use it
// prefix based on node id for keys generated on this instance
long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
Key key = Key.of(keyPrefix + k, appId);
NodeId leader = partitionService.getLeader(key);
if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
// Bail if we are not sending to this node or we have enough for this node
continue;
}
intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
// Bump up the counter and remember this as the last key used.
count++;
lastKey = k;
if (count % 1000 == 0) {
log.info("Building intents... {} (attempt: {})", count, lastKey);
}
}
checkState(intents.values().size() == numberOfKeys,
"Generated wrong number of intents");
log.info("Created {} intents", numberOfKeys);
intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
return Sets.newHashSet(intents.values());
}
// Submits intent operations.
final class Submitter implements Runnable {
private long lastDuration;
private int lastCount;
private Set<Intent> intents = Sets.newHashSet();
private Set<Intent> submitted = Sets.newHashSet();
private Set<Intent> withdrawn = Sets.newHashSet();
private Submitter(Set<Intent> intents) {
this.intents = intents;
lastCount = numKeys / 4;
lastDuration = 1_000; // 1 second
}
@Override
public void run() {
prime();
while (!stopped) {
try {
cycle();
} catch (Exception e) {
log.warn("Exception during cycle", e);
}
}
clear();
}
private Iterable<Intent> subset(Set<Intent> intents) {
List<Intent> subset = Lists.newArrayList(intents);
Collections.shuffle(subset);
return subset.subList(0, lastCount);
}
// Submits the specified intent.
private void submit(Intent intent) {
intentService.submit(intent);
submitted.add(intent);
withdrawn.remove(intent); //TODO could check result here...
}
// Withdraws the specified intent.
private void withdraw(Intent intent) {
intentService.withdraw(intent);
withdrawn.add(intent);
submitted.remove(intent); //TODO could check result here...
}
// Primes the cycle.
private void prime() {
int i = 0;
withdrawn.addAll(intents);
for (Intent intent : intents) {
submit(intent);
// only submit half of the intents to start
if (i++ >= intents.size() / 2) {
break;
}
}
}
private void clear() {
submitted.forEach(this::withdraw);
}
// Runs a single operation cycle.
private void cycle() {
//TODO consider running without rate adjustment
adjustRates();
long start = currentTimeMillis();
subset(submitted).forEach(this::withdraw);
subset(withdrawn).forEach(this::submit);
long delta = currentTimeMillis() - start;
if (delta > cyclePeriod * 3 || delta < 0) {
log.warn("Cycle took {} ms", delta);
}
int difference = cyclePeriod - (int) delta;
if (difference > 0) {
delay(difference);
}
lastDuration = delta;
}
int cycleCount = 0;
private void adjustRates() {
int addDelta = Math.max(1000 - cycleCount, 10);
double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);
//FIXME need to iron out the rate adjustment
//FIXME we should taper the adjustments over time
//FIXME don't just use the lastDuration, take an average
if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
lastDuration <= cyclePeriod) {
lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
} else {
lastCount *= multRatio;
}
log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
}
}
}
// Event listener to monitor throughput.
final class Listener implements IntentListener {
private final Counter runningTotal = new Counter();
private volatile Map<IntentEvent.Type, Counter> counters;
private volatile double processedThroughput = 0;
private volatile double requestThroughput = 0;
public Listener() {
counters = initCounters();
}
private Map<IntentEvent.Type, Counter> initCounters() {
Map<IntentEvent.Type, Counter> map = Maps.newHashMap();
for (IntentEvent.Type type : IntentEvent.Type.values()) {
map.put(type, new Counter());
}
return map;
}
public double processedThroughput() {
return processedThroughput;
}
public double requestThroughput() {
return requestThroughput;
}
@Override
public void event(IntentEvent event) {
if (event.subject().appId().equals(appId)) {
counters.get(event.type()).add(1);
}
}
public void report() {
Map<IntentEvent.Type, Counter> reportCounters = counters;
counters = initCounters();
// update running total and latest throughput
Counter installed = reportCounters.get(INSTALLED);
Counter withdrawn = reportCounters.get(WITHDRAWN);
processedThroughput = installed.throughput() + withdrawn.throughput();
runningTotal.add(installed.total() + withdrawn.total());
Counter installReq = reportCounters.get(INSTALL_REQ);
Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
requestThroughput = installReq.throughput() + withdrawReq.throughput();
// build the string to report
StringBuilder stringBuilder = new StringBuilder();
for (IntentEvent.Type type : IntentEvent.Type.values()) {
Counter counter = reportCounters.get(type);
stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
}
log.info("Throughput: OVERALL={}; CURRENT={}; {}",
format("%.2f", runningTotal.throughput()),
format("%.2f", processedThroughput),
stringBuilder);
sampleCollector.recordSample(runningTotal.throughput(),
processedThroughput);
}
}
}