Adding test apps as submodule to apps

Moving election and intent-perf there

Change-Id: Ia71e98438b33d3a1c5c12b08ae98c32930c4bd81
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
new file mode 100644
index 0000000..90eb9d7
--- /dev/null
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfCollector.java
@@ -0,0 +1,241 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Collects and distributes performance samples.
+ */
+@Component(immediate = true)
+@Service(value = IntentPerfCollector.class)
+public class IntentPerfCollector {
+
+    private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
+    private final Logger log = getLogger(getClass());
+
+    private static final int MAX_SAMPLES = 1_000;
+
+    private final List<Sample> samples = new LinkedList<>();
+
+    private static final MessageSubject SAMPLE = new MessageSubject("intent-perf-sample");
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService communicationService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
+    protected IntentPerfUi ui;
+
+    // Auxiliary structures used to accrue data for normalized time interval
+    // across all nodes.
+    private long newestTime;
+    private Sample overall;
+    private Sample current;
+
+    private ControllerNode[] nodes;
+    private Map<NodeId, Integer> nodeToIndex;
+
+    private NodeId nodeId;
+    private ExecutorService messageHandlingExecutor;
+
+    @Activate
+    public void activate() {
+        nodeId = clusterService.getLocalNode().id();
+
+        // TODO: replace with shared executor
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/perf", "message-handler"));
+
+        communicationService.addSubscriber(SAMPLE, new InternalSampleCollector(),
+                                           messageHandlingExecutor);
+
+        nodes = clusterService.getNodes().toArray(new ControllerNode[]{});
+        Arrays.sort(nodes, (a, b) -> a.id().toString().compareTo(b.id().toString()));
+
+        nodeToIndex = new HashMap<>();
+        for (int i = 0; i < nodes.length; i++) {
+            nodeToIndex.put(nodes[i].id(), i);
+        }
+
+        clearSamples();
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        messageHandlingExecutor.shutdown();
+        communicationService.removeSubscriber(SAMPLE);
+        log.info("Stopped");
+    }
+
+    /**
+     * Clears all previously accumulated data.
+     */
+    public void clearSamples() {
+        newestTime = 0;
+        overall = new Sample(0, nodes.length);
+        current = new Sample(0, nodes.length);
+        samples.clear();
+    }
+
+
+    /**
+     * Records a sample point of data about intent operation rate.
+     *
+     * @param overallRate overall rate
+     * @param currentRate current rate
+     */
+    public void recordSample(double overallRate, double currentRate) {
+        long now = System.currentTimeMillis();
+        addSample(now, nodeId, overallRate, currentRate);
+        broadcastSample(now, nodeId, overallRate, currentRate);
+    }
+
+    /**
+     * Returns set of node ids as headers.
+     *
+     * @return node id headers
+     */
+    public List<String> getSampleHeaders() {
+        List<String> headers = new ArrayList<>();
+        for (ControllerNode node : nodes) {
+            headers.add(node.id().toString());
+        }
+        return headers;
+    }
+
+    /**
+     * Returns set of all accumulated samples normalized to the local set of
+     * samples.
+     *
+     * @return accumulated samples
+     */
+    public synchronized List<Sample> getSamples() {
+        return ImmutableList.copyOf(samples);
+    }
+
+    /**
+     * Returns overall throughput performance for each of the cluster nodes.
+     *
+     * @return overall intent throughput
+     */
+    public synchronized Sample getOverall() {
+        return overall;
+    }
+
+    // Records a new sample to our collection of samples
+    private synchronized void addSample(long time, NodeId nodeId,
+                                        double overallRate, double currentRate) {
+        Sample fullSample = createCurrentSampleIfNeeded(time);
+        setSampleData(current, nodeId, currentRate);
+        setSampleData(overall, nodeId, overallRate);
+        pruneSamplesIfNeeded();
+
+        if (fullSample != null && ui != null) {
+            ui.reportSample(fullSample);
+        }
+    }
+
+    private Sample createCurrentSampleIfNeeded(long time) {
+        Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
+        if (oldSample != null) {
+            newestTime = time;
+            current = new Sample(time, nodes.length);
+            if (oldSample.time > 0) {
+                samples.add(oldSample);
+            }
+        }
+        return oldSample;
+    }
+
+    private void setSampleData(Sample sample, NodeId nodeId, double data) {
+        Integer index = nodeToIndex.get(nodeId);
+        if (index != null) {
+            sample.data[index] = data;
+        }
+    }
+
+    private void pruneSamplesIfNeeded() {
+        if (samples.size() > MAX_SAMPLES) {
+            samples.remove(0);
+        }
+    }
+
+    // Performance data sample.
+    static class Sample {
+        final long time;
+        final double[] data;
+
+        public Sample(long time, int nodeCount) {
+            this.time = time;
+            this.data = new double[nodeCount];
+            Arrays.fill(data, -1);
+        }
+
+        public boolean isComplete() {
+            for (int i = 0; i < data.length; i++) {
+                if (data[i] < 0) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
+        String data = String.format("%d|%f|%f", time, overallRate, currentRate);
+        communicationService.broadcast(new ClusterMessage(nodeId, SAMPLE, data.getBytes()));
+    }
+
+    private class InternalSampleCollector implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            String[] fields = new String(message.payload()).split("\\|");
+            log.debug("Received sample from {}: {}", message.sender(), fields);
+            addSample(Long.parseLong(fields[0]), message.sender(),
+                      Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
+        }
+    }
+}
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
new file mode 100644
index 0000000..02aa1b3
--- /dev/null
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -0,0 +1,587 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.MacAddress;
+import org.onlab.util.Counter;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.intent.Intent;
+import org.onosproject.net.intent.IntentEvent;
+import org.onosproject.net.intent.IntentListener;
+import org.onosproject.net.intent.IntentService;
+import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionService;
+import org.onosproject.net.intent.PointToPointIntent;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
+import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
+import static org.onlab.util.Tools.*;
+import static org.onosproject.net.intent.IntentEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Application to test sustained intent throughput.
+ */
+@Component(immediate = true)
+@Service(value = IntentPerfInstaller.class)
+public class IntentPerfInstaller {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final int DEFAULT_NUM_WORKERS = 1;
+
+    private static final int DEFAULT_NUM_KEYS = 40000;
+    private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
+
+    private static final int DEFAULT_NUM_NEIGHBORS = 0;
+
+    private static final int START_DELAY = 5_000; // ms
+    private static final int REPORT_PERIOD = 5_000; //ms
+
+    private static final String START = "start";
+    private static final String STOP = "stop";
+    private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
+
+    //FIXME add path length
+
+    @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
+            label = "Number of keys (i.e. unique intents) to generate per instance")
+    private int numKeys = DEFAULT_NUM_KEYS;
+
+    //TODO implement numWorkers property
+//    @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
+//              label = "Number of installer threads per instance")
+//    private int numWokers = DEFAULT_NUM_WORKERS;
+
+    @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
+            label = "Goal for cycle period (in ms)")
+    private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
+
+    @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
+            label = "Number of neighbors to generate intents for")
+    private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected IntentService intentService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected PartitionService partitionService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected ComponentConfigService configService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected IntentPerfCollector sampleCollector;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService communicationService;
+
+    private ExecutorService messageHandlingExecutor;
+
+    private ExecutorService workers;
+    private ApplicationId appId;
+    private Listener listener;
+    private boolean stopped;
+
+    private Timer reportTimer;
+
+    // FIXME this variable isn't shared properly between multiple worker threads
+    private int lastKey = 0;
+
+    private IntentPerfUi perfUi;
+    private NodeId nodeId;
+    private TimerTask reporterTask;
+
+    @Activate
+    public void activate(ComponentContext context) {
+        configService.registerProperties(getClass());
+
+        nodeId = clusterService.getLocalNode().id();
+        appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
+
+        // TODO: replace with shared timer
+        reportTimer = new Timer("onos-intent-perf-reporter");
+        workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
+
+        // disable flow backups for testing
+        configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
+                                  "backupEnabled", "false");
+
+        // TODO: replace with shared executor
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/perf", "command-handler"));
+
+        communicationService.addSubscriber(CONTROL, new InternalControl(),
+                                           messageHandlingExecutor);
+
+        listener = new Listener();
+        intentService.addListener(listener);
+
+        // TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
+        modify(context);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        stopTestRun();
+
+        configService.unregisterProperties(getClass(), false);
+        messageHandlingExecutor.shutdown();
+        communicationService.removeSubscriber(CONTROL);
+
+        if (listener != null) {
+            reportTimer.cancel();
+            intentService.removeListener(listener);
+            listener = null;
+            reportTimer = null;
+        }
+    }
+
+    @Modified
+    public void modify(ComponentContext context) {
+        if (context == null) {
+            logConfig("Reconfigured");
+            return;
+        }
+
+        Dictionary<?, ?> properties = context.getProperties();
+        int newNumKeys, newCyclePeriod, newNumNeighbors;
+        try {
+            String s = get(properties, "numKeys");
+            newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
+
+            s = get(properties, "cyclePeriod");
+            newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
+
+            s = get(properties, "numNeighbors");
+            newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
+
+        } catch (NumberFormatException | ClassCastException e) {
+            log.warn("Malformed configuration detected; using defaults", e);
+            newNumKeys = DEFAULT_NUM_KEYS;
+            newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
+            newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
+        }
+
+        if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
+            numKeys = newNumKeys;
+            cyclePeriod = newCyclePeriod;
+            numNeighbors = newNumNeighbors;
+            logConfig("Reconfigured");
+        }
+    }
+
+    public void start() {
+        communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
+        startTestRun();
+    }
+
+    public void stop() {
+        communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
+        stopTestRun();
+    }
+
+    private void logConfig(String prefix) {
+        log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
+                 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
+    }
+
+    private void startTestRun() {
+        sampleCollector.clearSamples();
+
+        // adjust numNeighbors and generate list of neighbors
+        numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
+
+        // Schedule reporter task on report period boundary
+        reporterTask = new ReporterTask();
+        reportTimer.scheduleAtFixedRate(reporterTask,
+                                        REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
+                                        REPORT_PERIOD);
+
+        // Submit workers
+        stopped = false;
+        for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
+            workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
+        }
+        log.info("Started test run");
+    }
+
+    private void stopTestRun() {
+        stopped = true;
+        if (reporterTask != null) {
+            reporterTask.cancel();
+            reporterTask = null;
+        }
+
+        try {
+            workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Failed to stop worker", e);
+        }
+        log.info("Stopped test run");
+    }
+
+    private List<NodeId> getNeighbors() {
+        List<NodeId> nodes = clusterService.getNodes().stream()
+                .map(ControllerNode::id)
+                .collect(Collectors.toCollection(ArrayList::new));
+        // sort neighbors by id
+        Collections.sort(nodes, (node1, node2) ->
+                node1.toString().compareTo(node2.toString()));
+        // rotate the local node to index 0
+        Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
+        log.debug("neighbors (raw): {}", nodes); //TODO remove
+        // generate the sub-list that will contain local node and selected neighbors
+        nodes = nodes.subList(0, numNeighbors + 1);
+        log.debug("neighbors: {}", nodes); //TODO remove
+        return nodes;
+    }
+
+    private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
+        // choose a random device for which this node is master
+        List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
+        Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
+
+        //FIXME we currently ignore the path length and always use the same device
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthDst(MacAddress.valueOf(mac)).build();
+        TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
+        ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
+        ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
+
+        return PointToPointIntent.builder()
+                .appId(appId)
+                .key(key)
+                .selector(selector)
+                .treatment(treatment)
+                .ingressPoint(ingress)
+                .egressPoint(egress)
+                .build();
+    }
+
+    /**
+     * Creates a specified number of intents for testing purposes.
+     *
+     * @param numberOfKeys number of intents
+     * @param pathLength   path depth
+     * @param firstKey     first key to attempt
+     * @return set of intents
+     */
+    private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
+        List<NodeId> neighbors = getNeighbors();
+
+        Multimap<NodeId, Device> devices = ArrayListMultimap.create();
+        deviceService.getAvailableDevices()
+                .forEach(device -> devices.put(mastershipService.getMasterFor(device.id()), device));
+
+        // ensure that we have at least one device per neighbor
+        neighbors.forEach(node -> checkState(devices.get(node).size() > 0,
+                                             "There are no devices for {}", node));
+
+        // TODO pull this outside so that createIntent can use it
+        // prefix based on node id for keys generated on this instance
+        long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
+
+        int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
+        Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
+
+        for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
+            Key key = Key.of(keyPrefix + k, appId);
+
+            NodeId leader = partitionService.getLeader(key);
+            if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
+                // Bail if we are not sending to this node or we have enough for this node
+                continue;
+            }
+            intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
+
+            // Bump up the counter and remember this as the last key used.
+            count++;
+            lastKey = k;
+            if (count % 1000 == 0) {
+                log.info("Building intents... {} (attempt: {})", count, lastKey);
+            }
+        }
+        checkState(intents.values().size() == numberOfKeys,
+                   "Generated wrong number of intents");
+        log.info("Created {} intents", numberOfKeys);
+        intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
+
+        return Sets.newHashSet(intents.values());
+    }
+
+    // Submits intent operations.
+    final class Submitter implements Runnable {
+
+        private long lastDuration;
+        private int lastCount;
+
+        private Set<Intent> intents = Sets.newHashSet();
+        private Set<Intent> submitted = Sets.newHashSet();
+        private Set<Intent> withdrawn = Sets.newHashSet();
+
+        private Submitter(Set<Intent> intents) {
+            this.intents = intents;
+            lastCount = numKeys / 4;
+            lastDuration = 1_000; // 1 second
+        }
+
+        @Override
+        public void run() {
+            prime();
+            while (!stopped) {
+                try {
+                    cycle();
+                } catch (Exception e) {
+                    log.warn("Exception during cycle", e);
+                }
+            }
+            clear();
+        }
+
+        private Iterable<Intent> subset(Set<Intent> intents) {
+            List<Intent> subset = Lists.newArrayList(intents);
+            Collections.shuffle(subset);
+            return subset.subList(0, lastCount);
+        }
+
+        // Submits the specified intent.
+        private void submit(Intent intent) {
+            intentService.submit(intent);
+            submitted.add(intent);
+            withdrawn.remove(intent); //TODO could check result here...
+        }
+
+        // Withdraws the specified intent.
+        private void withdraw(Intent intent) {
+            intentService.withdraw(intent);
+            withdrawn.add(intent);
+            submitted.remove(intent); //TODO could check result here...
+        }
+
+        // Primes the cycle.
+        private void prime() {
+            int i = 0;
+            withdrawn.addAll(intents);
+            for (Intent intent : intents) {
+                submit(intent);
+                // only submit half of the intents to start
+                if (i++ >= intents.size() / 2) {
+                    break;
+                }
+            }
+        }
+
+        private void clear() {
+            submitted.forEach(this::withdraw);
+        }
+
+        // Runs a single operation cycle.
+        private void cycle() {
+            //TODO consider running without rate adjustment
+            adjustRates();
+
+            long start = currentTimeMillis();
+            subset(submitted).forEach(this::withdraw);
+            subset(withdrawn).forEach(this::submit);
+            long delta = currentTimeMillis() - start;
+
+            if (delta > cyclePeriod * 3 || delta < 0) {
+                log.warn("Cycle took {} ms", delta);
+            }
+
+            int difference = cyclePeriod - (int) delta;
+            if (difference > 0) {
+                delay(difference);
+            }
+
+            lastDuration = delta;
+        }
+
+        int cycleCount = 0;
+
+        private void adjustRates() {
+
+            int addDelta = Math.max(1000 - cycleCount, 10);
+            double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);
+
+            //FIXME need to iron out the rate adjustment
+            //FIXME we should taper the adjustments over time
+            //FIXME don't just use the lastDuration, take an average
+            if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
+                if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
+                        lastDuration <= cyclePeriod) {
+                    lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
+                } else {
+                    lastCount *= multRatio;
+                }
+                log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
+                         lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
+            }
+
+        }
+    }
+
+    // Event listener to monitor throughput.
+    final class Listener implements IntentListener {
+
+        private final Counter runningTotal = new Counter();
+        private volatile Map<IntentEvent.Type, Counter> counters;
+
+        private volatile double processedThroughput = 0;
+        private volatile double requestThroughput = 0;
+
+        public Listener() {
+            counters = initCounters();
+        }
+
+        private Map<IntentEvent.Type, Counter> initCounters() {
+            Map<IntentEvent.Type, Counter> map = Maps.newHashMap();
+            for (IntentEvent.Type type : IntentEvent.Type.values()) {
+                map.put(type, new Counter());
+            }
+            return map;
+        }
+
+        public double processedThroughput() {
+            return processedThroughput;
+        }
+
+        public double requestThroughput() {
+            return requestThroughput;
+        }
+
+        @Override
+        public void event(IntentEvent event) {
+            if (event.subject().appId().equals(appId)) {
+                counters.get(event.type()).add(1);
+            }
+        }
+
+        public void report() {
+            Map<IntentEvent.Type, Counter> reportCounters = counters;
+            counters = initCounters();
+
+            // update running total and latest throughput
+            Counter installed = reportCounters.get(INSTALLED);
+            Counter withdrawn = reportCounters.get(WITHDRAWN);
+            processedThroughput = installed.throughput() + withdrawn.throughput();
+            runningTotal.add(installed.total() + withdrawn.total());
+
+            Counter installReq = reportCounters.get(INSTALL_REQ);
+            Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
+            requestThroughput = installReq.throughput() + withdrawReq.throughput();
+
+            // build the string to report
+            StringBuilder stringBuilder = new StringBuilder();
+            for (IntentEvent.Type type : IntentEvent.Type.values()) {
+                Counter counter = reportCounters.get(type);
+                stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
+            }
+            log.info("Throughput: OVERALL={}; CURRENT={}; {}",
+                     format("%.2f", runningTotal.throughput()),
+                     format("%.2f", processedThroughput),
+                     stringBuilder);
+
+            sampleCollector.recordSample(runningTotal.throughput(),
+                                         processedThroughput);
+        }
+    }
+
+    private class InternalControl implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            String cmd = new String(message.payload());
+            log.info("Received command {}", cmd);
+            if (cmd.equals(START)) {
+                startTestRun();
+            } else {
+                stopTestRun();
+            }
+        }
+    }
+
+    private class ReporterTask extends TimerTask {
+        @Override
+        public void run() {
+            //adjustRates(); // FIXME we currently adjust rates in the cycle thread
+            listener.report();
+        }
+    }
+
+}
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfListCommand.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfListCommand.java
new file mode 100644
index 0000000..a556712
--- /dev/null
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfListCommand.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.intentperf.IntentPerfCollector.Sample;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Displays accumulated performance metrics.
+ */
+@Command(scope = "onos", name = "intent-perf",
+        description = "Displays accumulated performance metrics")
+public class IntentPerfListCommand extends AbstractShellCommand {
+
+    @Option(name = "-s", aliases = "--summary", description = "Output just summary",
+            required = false, multiValued = false)
+    private boolean summary = false;
+
+    @Override
+    protected void execute() {
+        if (summary) {
+            printSummary();
+        } else {
+            printSamples();
+        }
+    }
+
+    private void printSummary() {
+        IntentPerfCollector collector = get(IntentPerfCollector.class);
+        List<String> headers = collector.getSampleHeaders();
+        Sample overall = collector.getOverall();
+        double total = 0;
+        print("%12s: %14s", "Node ID", "Overall Rate");
+        for (int i = 0; i < overall.data.length; i++) {
+            if (overall.data[i] >= 0) {
+                print("%12s: %14.2f", headers.get(i), overall.data[i]);
+                total += overall.data[i];
+            } else {
+                print("%12s: %14s", headers.get(i), " ");
+            }
+        }
+        print("%12s: %14.2f", "total", total);
+    }
+
+    private void printSamples() {
+        IntentPerfCollector collector = get(IntentPerfCollector.class);
+        List<String> headers = collector.getSampleHeaders();
+        List<Sample> samples = collector.getSamples();
+
+        System.out.print(String.format("%10s  ", "Time"));
+        for (String header : headers) {
+            System.out.print(String.format("%12s  ", header));
+        }
+        System.out.println(String.format("%12s", "Total"));
+
+        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
+        for (Sample sample : samples) {
+            double total = 0;
+            System.out.print(String.format("%10s  ", sdf.format(new Date(sample.time))));
+            for (int i = 0; i < sample.data.length; i++) {
+                if (sample.data[i] >= 0) {
+                    System.out.print(String.format("%12.2f  ", sample.data[i]));
+                    total += sample.data[i];
+                } else {
+                    System.out.print(String.format("%12s  ", " "));
+                }
+            }
+            System.out.println(String.format("%12.2f", total));
+        }
+    }
+
+}
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStartCommand.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStartCommand.java
new file mode 100644
index 0000000..3549153
--- /dev/null
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStartCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+
+/**
+ * Starts intent performance test run.
+ */
+@Command(scope = "onos", name = "intent-perf-start",
+        description = "Starts intent performance test run")
+public class IntentPerfStartCommand extends AbstractShellCommand {
+
+    @Override
+    protected void execute() {
+        get(IntentPerfInstaller.class).start();
+    }
+
+}
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStopCommand.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStopCommand.java
new file mode 100644
index 0000000..ac45cd8
--- /dev/null
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfStopCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.cli.AbstractShellCommand;
+
+/**
+ * Stops intent performance test run.
+ */
+@Command(scope = "onos", name = "intent-perf-stop",
+        description = "Stops intent performance test run")
+public class IntentPerfStopCommand extends AbstractShellCommand {
+
+    @Override
+    protected void execute() {
+        get(IntentPerfInstaller.class).stop();
+    }
+
+}
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfUi.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfUi.java
new file mode 100644
index 0000000..d30fe4e
--- /dev/null
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfUi.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.intentperf;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.osgi.ServiceDirectory;
+import org.onosproject.intentperf.IntentPerfCollector.Sample;
+import org.onosproject.ui.UiConnection;
+import org.onosproject.ui.UiExtension;
+import org.onosproject.ui.UiExtensionService;
+import org.onosproject.ui.UiMessageHandler;
+import org.onosproject.ui.UiView;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.Collections.synchronizedSet;
+
+/**
+ * Mechanism to stream data to the GUI.
+ */
+@Component(immediate = true, enabled = false)
+public class IntentPerfUi {
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UiExtensionService uiExtensionService;
+
+    private final Set<StreamingControl> handlers = synchronizedSet(new HashSet<>());
+
+    private List<UiView> views = ImmutableList.of(new UiView("intentPerf", "Intent Performance"));
+    private UiExtension uiExtension = new UiExtension(views, this::newHandlers,
+                                                      getClass().getClassLoader());
+
+    @Activate
+    protected void activate() {
+        uiExtensionService.register(uiExtension);
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        uiExtensionService.unregister(uiExtension);
+    }
+
+    /**
+     * Reports a single sample of performance data.
+     *
+     * @param sample performance sample
+     */
+    public void reportSample(Sample sample) {
+        synchronized (handlers) {
+            handlers.forEach(h -> h.send(sample));
+        }
+    }
+
+    // Creates and returns session specific message handler.
+    private Collection<UiMessageHandler> newHandlers() {
+        return ImmutableList.of(new StreamingControl());
+    }
+
+    // UI Message handlers for turning on/off reporting to a session.
+    private class StreamingControl extends UiMessageHandler {
+
+        private boolean streamingEnabled = false;
+
+        protected StreamingControl() {
+            super(ImmutableSet.of("intentPerfStart", "intentPerfStop"));
+        }
+
+        @Override
+        public void process(ObjectNode message) {
+            streamingEnabled = message.path("event").asText("unknown").equals("initPerfStart");
+        }
+
+        @Override
+        public void init(UiConnection connection, ServiceDirectory directory) {
+            super.init(connection, directory);
+            handlers.add(this);
+        }
+
+        @Override
+        public void destroy() {
+            super.destroy();
+            handlers.remove(this);
+        }
+
+        private void send(Sample sample) {
+            // FIXME: finish this
+            ObjectNode sn = mapper.createObjectNode()
+                    .put("time", sample.time);
+            connection().sendMessage("intentPerf", 0, sn);
+        }
+    }
+
+}
diff --git a/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/package-info.java b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/package-info.java
new file mode 100644
index 0000000..75d40e4
--- /dev/null
+++ b/apps/test/intent-perf/src/main/java/org/onosproject/intentperf/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Performance test application that induces steady load on the intent subsystem.
+ */
+package org.onosproject.intentperf;
\ No newline at end of file