Pulling PartitionService into API and making IntentPerfInstaller configurable

Change-Id: I9fde28986b6714c0ca4d635d5a3699094e2f0081
diff --git a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
index 80606267..a89445d 100644
--- a/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
+++ b/apps/intent-perf/src/main/java/org/onosproject/intentperf/IntentPerfInstaller.java
@@ -15,21 +15,28 @@
  */
 package org.onosproject.intentperf;
 
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.onlab.packet.MacAddress;
 import org.onlab.util.Counter;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
-import org.onosproject.net.MastershipRole;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
@@ -41,12 +48,12 @@
 import org.onosproject.net.intent.IntentListener;
 import org.onosproject.net.intent.IntentService;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionService;
 import org.onosproject.net.intent.PointToPointIntent;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -55,6 +62,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
 import static java.lang.String.format;
@@ -71,16 +79,37 @@
 @Component(immediate = true)
 public class IntentPerfInstaller {
 
-    //FIXME make this configurable
-    private static final int NUM_WORKERS = 1;
-    private static final int NUM_KEYS = 40_000;
-
-    public static final int START_DELAY = 5_000; // ms
-    private static final int REPORT_PERIOD = 5_000; //ms
-    private static final int GOAL_CYCLE_PERIOD = 1_000; //ms
-
     private final Logger log = getLogger(getClass());
 
+    private static final int DEFAULT_NUM_WORKERS = 1;
+
+    private static final int DEFAULT_NUM_KEYS = 40_000;
+    private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1_000; //ms
+
+    private static final int DEFAULT_NUM_NEIGHBORS = 0;
+
+    private static final int START_DELAY = 5_000; // ms
+    private static final int REPORT_PERIOD = 5_000; //ms
+
+    //FIXME add path length
+
+    @Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
+              label = "Number of keys (i.e. unique intents) to generate per instance")
+    private int numKeys = DEFAULT_NUM_KEYS;
+
+    //TODO implement numWorkers property
+//    @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
+//              label = "Number of installer threads per instance")
+//    private int numWokers = DEFAULT_NUM_WORKERS;
+
+    @Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
+              label = "Goal for cycle period (in ms)")
+    private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
+
+    @Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
+              label = "Number of neighbors to generate intents for")
+    private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
+
     @Reference(cardinality = MANDATORY_UNARY)
     protected CoreService coreService;
 
@@ -93,6 +122,15 @@
     @Reference(cardinality = MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected PartitionService partitionService;
+
+    @Reference(cardinality = MANDATORY_UNARY)
+    protected ComponentConfigService configService;
+
     private ExecutorService workers;
     private ApplicationId appId;
     private Listener listener;
@@ -105,12 +143,20 @@
 
     @Activate
     public void activate() {
+        configService.registerProperties(getClass());
+
         String nodeId = clusterService.getLocalNode().ip().toString();
         appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
 
         reportTimer = new Timer("onos-intent-perf-reporter");
-        workers = Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
-        log.info("Started with Application ID {}", appId.id());
+        workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
+
+
+        // disable flow backups for testing
+        log.info("flow props: {}",
+                 configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
+        configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
+                                  "backupEnabled", "false");
 
         // Schedule delayed start
         reportTimer.schedule(new TimerTask() {
@@ -123,11 +169,22 @@
 
     @Deactivate
     public void deactivate() {
+        configService.unregisterProperties(getClass(), false);
         stop();
-        log.info("Stopped");
+    }
+
+    //FIXME add modified
+
+    private void logConfig(String prefix) {
+        log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
+                 prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
     }
 
     public void start() {
+        // TODO perhaps move to start(), but need to call before logConfig
+        // adjust numNeighbors and generate list of neighbors
+        numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
+
         // perhaps we want to prime before listening...
         // we will need to discard the first few results for priming and warmup
         listener = new Listener();
@@ -144,25 +201,62 @@
 
         // Submit workers
         stopped = false;
-        Set<Device> devices = new HashSet<>();
-        for (int i = 0; i < NUM_WORKERS; i++) {
-            workers.submit(new Submitter(createIntents(NUM_KEYS, 2, lastKey, devices)));
+        for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
+            workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
         }
+        logConfig("Started");
     }
 
     public void stop() {
+        stopped = true;
         if (listener != null) {
             reportTimer.cancel();
             intentService.removeListener(listener);
             listener = null;
             reportTimer = null;
         }
-        stopped = true;
         try {
             workers.awaitTermination(5, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             log.warn("Failed to stop worker", e);
         }
+        log.info("Stopped");
+    }
+
+    private List<NodeId> getNeighbors() {
+        List<NodeId> nodes = clusterService.getNodes().stream()
+                .map(ControllerNode::id)
+                .collect(Collectors.toCollection(ArrayList::new));
+        // sort neighbors by id
+        Collections.sort(nodes, (node1, node2) ->
+                node1.toString().compareTo(node2.toString()));
+        // rotate the local node to index 0
+        Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
+        log.info("neighbors (raw): {}", nodes); //TODO remove
+        // generate the sub-list that will contain local node and selected neighbors
+        nodes = nodes.subList(0, numNeighbors + 1);
+        log.info("neighbors: {}", nodes); //TODO remove
+        return nodes;
+    }
+
+
+    private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
+        // choose a random device for which this node is master
+        List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
+        Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
+
+        //FIXME we currently ignore the path length and always use the same device
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthDst(MacAddress.valueOf(mac)).build();
+        TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
+        ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
+        ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
+
+        return new PointToPointIntent(appId, key,
+                                      selector, treatment,
+                                      ingress, egress,
+                                      Collections.emptyList(),
+                                      Intent.DEFAULT_INTENT_PRIORITY);
     }
 
     /**
@@ -171,58 +265,55 @@
      * @param numberOfKeys number of intents
      * @param pathLength   path depth
      * @param firstKey     first key to attempt
-     * @param devices      set of previously utilized devices  @return set of intents
+     * @return set of intents
      */
-    private Set<Intent> createIntents(int numberOfKeys, int pathLength,
-                                      int firstKey, Set<Device> devices) {
-        Iterator<Device> deviceItr = deviceService.getAvailableDevices().iterator();
-        Set<Intent> result = new HashSet<>();
+    private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
+        //Set<Intent> result = new HashSet<>();
 
-        Device ingressDevice = null;
-        while (deviceItr.hasNext()) {
-            Device device = deviceItr.next();
-            if (deviceService.getRole(device.id()) == MastershipRole.MASTER &&
-                    !devices.contains(device)) {
-                ingressDevice = device;
-                devices.add(device);
-                break;
-            }
-        }
-        checkState(ingressDevice != null, "There are no local devices");
+        List<NodeId> neighbors = getNeighbors();
 
-        // prefix based on node id
-        long prefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
+        Multimap<NodeId, Device> devices = ArrayListMultimap.create();
+        deviceService.getAvailableDevices().forEach(device ->
+            devices.put(mastershipService.getMasterFor(device.id()), device));
+
+        // ensure that we have at least one device per neighbor
+        neighbors.forEach(node ->
+            checkState(devices.get(node).size() > 0,
+                       "There are no devices for {}", node));
+
+
+        // TODO pull this outside so that createIntent can use it
+        // prefix based on node id for keys generated on this instance
+        long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
+
+        int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
+        Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
+
         for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
-            Key key = Key.of(prefix + k, appId);
-            //FIXME comment this out for spread of keys
-            if (!intentService.isLocal(key)) {
-                // Bail if the key is not local
+            Key key = Key.of(keyPrefix + k, appId);
+
+            NodeId leader = partitionService.getLeader(key);
+            if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
+                // Bail if we are not sending to this node or we have enough for this node
                 continue;
             }
-
-            //FIXME we currently ignore the path length and always use the same device
-            TrafficSelector selector = DefaultTrafficSelector.builder()
-                    .matchEthDst(MacAddress.valueOf(count)).build();
-            TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
-            ConnectPoint ingress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(1));
-            ConnectPoint egress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(2));
-
-            Intent intent = new PointToPointIntent(appId, key,
-                                                   selector, treatment,
-                                                   ingress, egress,
-                                                   Collections.emptyList(),
-                                                   Intent.DEFAULT_INTENT_PRIORITY);
-            result.add(intent);
+            intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
 
             // Bump up the counter and remember this as the last key used.
             count++;
             lastKey = k;
-            if (lastKey % 1000 == 0) {
-                log.info("Building intents... {} (attempt: {})", lastKey, count);
+            if (count % 1000 == 0) {
+                log.info("Building intents... {} (attempt: {})", count, lastKey);
             }
         }
+        checkState(intents.values().size() == numberOfKeys,
+                   "Generated wrong number of intents");
         log.info("Created {} intents", numberOfKeys);
-        return result;
+
+        //FIXME remove this
+        intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
+
+        return Sets.newHashSet(intents.values());
     }
 
     // Submits intent operations.
@@ -237,8 +328,8 @@
 
         private Submitter(Set<Intent> intents) {
             this.intents = intents;
-            lastCount = NUM_KEYS / 4;
-            lastDuration = 1000; // 1 second
+            lastCount = numKeys / 4;
+            lastDuration = 1_000; // 1 second
         }
 
         @Override
@@ -247,6 +338,7 @@
             while (!stopped) {
                 cycle();
             }
+            clear();
         }
 
         private Iterable<Intent> subset(Set<Intent> intents) {
@@ -282,6 +374,10 @@
             }
         }
 
+        private void clear() {
+            submitted.forEach(this::withdraw);
+        }
+
         // Runs a single operation cycle.
         private void cycle() {
             //TODO consider running without rate adjustment
@@ -292,11 +388,11 @@
             subset(withdrawn).forEach(this::submit);
             long delta = currentTimeMillis() - start;
 
-            if (delta > GOAL_CYCLE_PERIOD * 3 || delta < 0) {
+            if (delta > cyclePeriod * 3 || delta < 0) {
                 log.warn("Cycle took {} ms", delta);
             }
 
-            int difference = GOAL_CYCLE_PERIOD - (int) delta;
+            int difference = cyclePeriod - (int) delta;
             if (difference > 0) {
                 delay(difference);
             }
@@ -307,9 +403,10 @@
         int cycleCount = 0;
         private void adjustRates() {
             //FIXME need to iron out the rate adjustment
+            //FIXME we should taper the adjustments over time
             if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
                 if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
-                        lastDuration <= GOAL_CYCLE_PERIOD) {
+                        lastDuration <= cyclePeriod) {
                     lastCount = Math.min(lastCount + 1000, intents.size() / 2);
                 } else {
                     lastCount *= 0.8;
@@ -324,8 +421,8 @@
     // Event listener to monitor throughput.
     final class Listener implements IntentListener {
 
-        private Map<IntentEvent.Type, Counter> counters;
         private final Counter runningTotal = new Counter();
+        private volatile Map<IntentEvent.Type, Counter> counters;
 
         private volatile double processedThroughput = 0;
         private volatile double requestThroughput = 0;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java b/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
similarity index 93%
rename from core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
rename to core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
index 2ee4434..e963abc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionService.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
@@ -13,10 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.store.intent.impl;
+package org.onosproject.net.intent;
 
 import org.onosproject.cluster.NodeId;
-import org.onosproject.net.intent.Key;
 
 /**
  * Service for interacting with the partition-to-instance assignments.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 717c9fe..2b59fae 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -33,6 +33,7 @@
 import org.onosproject.net.intent.IntentStore;
 import org.onosproject.net.intent.IntentStoreDelegate;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionService;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.ecmap.EventuallyConsistentMap;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index 5fa26a7..72590aa 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -31,6 +31,7 @@
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;