Added test code to test the flow subsystem performance

Change-Id: I1a7c68492760a63b7d092c3ca71e4964123c8aa7
diff --git a/apps/demo/src/main/java/org/onosproject/demo/DemoInstaller.java b/apps/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
index fe32eef..10ac5a3 100644
--- a/apps/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
+++ b/apps/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
@@ -16,27 +16,40 @@
 package org.onosproject.demo;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-
+import org.onlab.packet.MacAddress;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
 import org.onosproject.net.Host;
 import org.onosproject.net.HostId;
 import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.host.HostService;
@@ -48,7 +61,6 @@
 import org.onosproject.net.intent.IntentService;
 import org.slf4j.Logger;
 
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -59,11 +71,14 @@
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.TimeoutException;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -94,15 +109,23 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowRuleService flowService;
+
     private ExecutorService worker;
 
-    private ExecutorService randomWorker;
+    private ExecutorService installWorker;
 
     private ApplicationId appId;
 
     private final Set<Intent> existingIntents = new HashSet<>();
     private RandomInstaller randomInstaller;
 
+    private ObjectMapper mapper = new ObjectMapper();
+
 
 
     @Activate
@@ -120,13 +143,33 @@
     @Deactivate
     public void deactivate() {
         shutdownAndAwaitTermination(worker);
-        if (!randomWorker.isShutdown()) {
-            shutdownAndAwaitTermination(randomWorker);
+        if (installWorker != null && !installWorker.isShutdown()) {
+            shutdownAndAwaitTermination(installWorker);
         }
         log.info("Stopped");
     }
 
     @Override
+    public JsonNode flowTest(Optional<JsonNode> params) {
+        int flowsPerDevice = 1000;
+        int neighbours = 0;
+        if (params.isPresent()) {
+            flowsPerDevice = params.get().get("flowsPerDevice").asInt();
+            neighbours = params.get().get("neighbours").asInt();
+        }
+
+        Future<JsonNode> future = worker.submit(new FlowTest(flowsPerDevice, neighbours));
+
+        try {
+            return future.get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            ObjectNode node = mapper.createObjectNode();
+            node.put("Error", e.getMessage());
+            return node;
+        }
+    }
+
+    @Override
     public void setup(InstallType type, Optional<JsonNode> runParams) {
         switch (type) {
             case MESH:
@@ -135,14 +178,14 @@
                 break;
             case RANDOM:
                 //check that we do not have a random installer running
-                if (randomWorker == null || randomWorker.isShutdown()) {
-                    randomWorker = Executors.newFixedThreadPool(1,
+                if (installWorker == null || installWorker.isShutdown()) {
+                    installWorker = Executors.newFixedThreadPool(1,
                                                    new ThreadFactoryBuilder()
                                                            .setNameFormat("random-worker")
                                                            .build());
                     log.debug("Installing random sequence of intents");
                     randomInstaller = new RandomInstaller(runParams);
-                    randomWorker.execute(randomInstaller);
+                    installWorker.execute(randomInstaller);
                 } else {
                     log.warn("Random installer is already running");
                 }
@@ -239,7 +282,7 @@
 
         @Override
         public void run() {
-            if (!randomWorker.isShutdown()) {
+            if (!installWorker.isShutdown()) {
                 randomize();
                 latch = new CountDownLatch(1);
                 try {
@@ -269,12 +312,12 @@
                     log.warn("A batch is stuck processing. " +
                                      "pending : {}",
                              intentBatchService.getPendingOperations());
-                    shutdownAndAwaitTermination(randomWorker);
+                    shutdownAndAwaitTermination(installWorker);
                 }
             }
             //if everyting is good proceed.
-            if (!randomWorker.isShutdown()) {
-                randomWorker.execute(this);
+            if (!installWorker.isShutdown()) {
+                installWorker.execute(this);
             }
 
         }
@@ -432,8 +475,8 @@
                 clearExistingIntents();
             }
 
-            if (randomWorker != null && !randomWorker.isShutdown()) {
-                shutdownAndAwaitTermination(randomWorker);
+            if (installWorker != null && !installWorker.isShutdown()) {
+                shutdownAndAwaitTermination(installWorker);
                 randomInstaller.shutdown();
             }
         }
@@ -470,6 +513,91 @@
         }
     }
 
+    private class FlowTest implements Callable<JsonNode> {
+        private final int flowPerDevice;
+        private final int neighbours;
+        private FlowRuleOperations.Builder adds;
+        private FlowRuleOperations.Builder removes;
+
+        public FlowTest(int flowsPerDevice, int neighbours) {
+            this.flowPerDevice = flowsPerDevice;
+            this.neighbours = neighbours;
+            prepareInstallation();
+        }
+
+        private void prepareInstallation() {
+            Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
+            instances.remove(clusterService.getLocalNode());
+            Set<NodeId> acceptableNodes = Sets.newHashSet();
+            if (neighbours >= instances.size()) {
+                instances.forEach(instance -> acceptableNodes.add(instance.id()));
+            } else {
+                Iterator<ControllerNode> nodes = instances.iterator();
+                for (int i = neighbours; i > 0; i--) {
+                    acceptableNodes.add(nodes.next().id());
+                }
+            }
+            acceptableNodes.add(clusterService.getLocalNode().id());
+
+            Set<Device> devices = Sets.newHashSet();
+            for (Device dev : deviceService.getDevices()) {
+                if (acceptableNodes.contains(
+                        mastershipService.getMasterFor(dev.id()))) {
+                    devices.add(dev);
+                }
+            }
+
+            TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                    .setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
+            TrafficSelector.Builder sbuilder;
+            FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
+            FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
+
+            for (Device d : devices) {
+                for (int i = 0; i < this.flowPerDevice; i++) {
+                    sbuilder = DefaultTrafficSelector.builder();
+
+                    sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
+                            .matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
+
+
+                    int randomPriority = RandomUtils.nextInt();
+                    DefaultFlowRule f = new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
+                                                            randomPriority, appId, 10, false);
+                    rules.add(f);
+                    remove.remove(f);
+
+                }
+            }
+
+            this.adds = rules;
+            this.removes = remove;
+        }
+
+        @Override
+        public JsonNode call() throws Exception {
+            ObjectNode node = mapper.createObjectNode();
+            CountDownLatch latch = new CountDownLatch(1);
+            flowService.apply(adds.build(new FlowRuleOperationsContext() {
+
+                private final Stopwatch timer = Stopwatch.createStarted();
+
+                @Override
+                public void onSuccess(FlowRuleOperations ops) {
+
+                    long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+                    node.put("elapsed", elapsed);
+
+
+                    latch.countDown();
+                }
+            }));
+
+            latch.await(10, TimeUnit.SECONDS);
+            flowService.apply(removes.build());
+            return node;
+        }
+    }
 }