[ONOS-6409] Create throughput test for FlowObjective subsystem

Change-Id: I04689683b150a752510dbf20c37071edb890a0b0
diff --git a/apps/test/demo/src/main/java/org/onosproject/demo/DemoApi.java b/apps/test/demo/src/main/java/org/onosproject/demo/DemoApi.java
index 16779e9..f60bdfc 100644
--- a/apps/test/demo/src/main/java/org/onosproject/demo/DemoApi.java
+++ b/apps/test/demo/src/main/java/org/onosproject/demo/DemoApi.java
@@ -35,6 +35,14 @@
     JsonNode flowTest(Optional<JsonNode> params);
 
     /**
+     * Tests flow Objective subsystem based on the parameters supplied.
+     *
+     * @param params the test parameters
+     * @return JSON representation
+     */
+    JsonNode flowObjTest(Optional<JsonNode> params);
+
+    /**
      * Installs intents based on the installation type.
      * @param type the installation type.
      * @param runParams run params
diff --git a/apps/test/demo/src/main/java/org/onosproject/demo/DemoInstaller.java b/apps/test/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
index 4fa383a..5051785 100644
--- a/apps/test/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
+++ b/apps/test/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
@@ -17,6 +17,9 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -63,6 +66,16 @@
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.DeviceId;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.intent.Constraint;
 import org.onosproject.net.intent.HostToHostIntent;
@@ -112,6 +125,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleService flowService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowObjectiveService objectiveService;
+
     private ExecutorService worker;
 
     private ExecutorService installWorker;
@@ -123,6 +139,7 @@
 
     private ObjectMapper mapper = new ObjectMapper();
 
+    private AtomicLong macIndex;
 
 
     @Activate
@@ -169,6 +186,30 @@
     }
 
     @Override
+    public JsonNode flowObjTest(Optional<JsonNode> params) {
+        int flowObjPerDevice = 1000;
+        int neighbours = 0;
+        boolean remove = true;
+        String typeObj = "forward";
+        if (params.isPresent()) {
+            flowObjPerDevice = params.get().get("flowObjPerDevice").asInt();
+            neighbours = params.get().get("neighbours").asInt();
+            remove = params.get().get("remove").asBoolean();
+            typeObj = params.get().get("typeObj").asText().toString();
+        }
+
+        Future<JsonNode> future = worker.submit(new FlowObjTest(flowObjPerDevice, neighbours, remove, typeObj));
+
+        try {
+            return future.get(10, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            ObjectNode node = mapper.createObjectNode();
+            node.put("Error", e.getMessage());
+            return node;
+        }
+    }
+
+    @Override
     public void setup(InstallType type, Optional<JsonNode> runParams) {
         switch (type) {
             case MESH:
@@ -598,6 +639,209 @@
             return node;
         }
     }
+
+    private class FlowObjTest implements Callable<JsonNode> {
+        private final int flowObjPerDevice;
+        private final int neighbours;
+        private final boolean remove;
+        private final String typeObj;
+        Map<DeviceId, Set<ForwardingObjective.Builder>> forwardingObj = new HashMap<>();
+        Map<DeviceId, Set<FilteringObjective.Builder>> filteringObj  = new HashMap<>();
+
+        public FlowObjTest(int flowObjPerDevice, int neighbours, boolean remove, String typeObj) {
+            this.flowObjPerDevice = flowObjPerDevice;
+            this.neighbours = neighbours;
+            this.remove = remove;
+            this.typeObj = typeObj;
+            prepareInstallation();
+        }
+
+        private void prepareInstallation() {
+            Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
+            instances.remove(clusterService.getLocalNode());
+            Set<NodeId> acceptableNodes = Sets.newHashSet();
+            macIndex = new AtomicLong(0);
+            if (neighbours >= instances.size()) {
+                instances.forEach(instance -> acceptableNodes.add(instance.id()));
+            } else {
+                Iterator<ControllerNode> nodes = instances.iterator();
+                for (int i = neighbours; i > 0; i--) {
+                    acceptableNodes.add(nodes.next().id());
+                }
+            }
+            acceptableNodes.add(clusterService.getLocalNode().id());
+
+            Set<Device> devices = Sets.newHashSet();
+            for (Device dev : deviceService.getDevices()) {
+                if (acceptableNodes.contains(
+                        mastershipService.getMasterFor(dev.id()))) {
+                    devices.add(dev);
+                }
+            }
+            for (Device device : devices) {
+                switch (this.typeObj) {
+                    case "forward":
+                        forwardingObj.put(device.id(), createForward(flowObjPerDevice));
+                        break;
+                    case "filter":
+                        filteringObj.put(device.id(), createFilter(flowObjPerDevice));
+                        break;
+                    default:
+                        log.warn("Unsupported Flow Objective Type");
+                        break;
+                }
+            }
+        }
+
+        /*
+         * Method to create forwarding flow objectives.
+         */
+        private Set<ForwardingObjective.Builder> createForward(int flowObjPerDevice) {
+            Set<ForwardingObjective.Builder> fObj = new HashSet<>();
+            for (int i = 0; i < flowObjPerDevice; i++) {
+                TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
+                sbuilder.matchEthSrc(MacAddress.valueOf(macIndex.incrementAndGet()));
+                sbuilder.matchInPort(PortNumber.portNumber(2));
+
+                TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
+                tbuilder.add(Instructions.createOutput(PortNumber.portNumber(3)));
+
+                ForwardingObjective.Builder fobBuilder = DefaultForwardingObjective.builder();
+                fobBuilder.withFlag(ForwardingObjective.Flag.SPECIFIC)
+                        .withSelector(sbuilder.build())
+                        .withTreatment(tbuilder.build())
+                        .withPriority(i + 1)
+                        .fromApp(appId)
+                        .makePermanent();
+
+                fObj.add(fobBuilder);
+            }
+            return fObj;
+        }
+
+        /*
+         *Method to install forwarding flow objectives.
+         */
+        private ObjectNode installForward() {
+            ObjectNode node = mapper.createObjectNode();
+            long addStartTime = System.currentTimeMillis();
+            int totalFlowObj = (flowObjPerDevice * deviceService.getDeviceCount());
+            CountDownLatch installationLatch = new CountDownLatch(totalFlowObj);
+            for (DeviceId dId : forwardingObj.keySet()) {
+                Set<ForwardingObjective.Builder> fObjs = forwardingObj.get(dId);
+                for (ForwardingObjective.Builder builder : fObjs) {
+                    ObjectiveContext context = new DefaultObjectiveContext(
+                            (objective -> {
+                                installationLatch.countDown();
+                            })
+                    );
+                    objectiveService.forward(dId, builder.add(context));
+                }
+            }
+
+            try {
+                installationLatch.await();
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+            }
+
+            node.put("elapsed", System.currentTimeMillis() - addStartTime);
+            log.info("{} Forward Flow Objectives elapsed -> {} ms",
+                     totalFlowObj, (System.currentTimeMillis() - addStartTime));
+
+            if (this.remove) {
+                for (DeviceId dId : forwardingObj.keySet()) {
+                    Set<ForwardingObjective.Builder> fObjs = forwardingObj.get(dId);
+                    for (ForwardingObjective.Builder builder : fObjs) {
+                        objectiveService.forward(dId, builder.remove());
+                    }
+                }
+            }
+            return node;
+
+        }
+
+        /*
+         * Method to create filtering flow objectives.
+         */
+        private Set<FilteringObjective.Builder> createFilter(int flowObjPerDevice) {
+            Set<FilteringObjective.Builder> filterObjSet = new HashSet<>();
+            for (int i = 0; i < flowObjPerDevice; i++) {
+                TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
+                tbuilder.add(Instructions.createOutput(PortNumber.portNumber(2)));
+
+                FilteringObjective.Builder fobBuilder = DefaultFilteringObjective.builder();
+                fobBuilder.fromApp(appId)
+                          .addCondition(Criteria.matchEthType(2))
+                          .withMeta(tbuilder.build())
+                          .permit()
+                          .withPriority(i + 1)
+                          .makePermanent();
+
+                filterObjSet.add(fobBuilder);
+            }
+
+            return filterObjSet;
+        }
+
+        /*
+         * Method to install filtering flow objectives.
+         */
+        private ObjectNode installFilter() {
+            ObjectNode node = mapper.createObjectNode();
+            long addStartTime = System.currentTimeMillis();
+            int totalFlowObj = (flowObjPerDevice * deviceService.getDeviceCount());
+            CountDownLatch installationLatch = new CountDownLatch(totalFlowObj);
+            for (DeviceId dId : filteringObj.keySet()) {
+                Set<FilteringObjective.Builder> fObjs = filteringObj.get(dId);
+                for (FilteringObjective.Builder builder : fObjs) {
+                    ObjectiveContext context = new DefaultObjectiveContext(
+                            (objective -> {
+                                installationLatch.countDown();
+                            })
+                    );
+                    objectiveService.filter(dId, builder.add(context));
+                }
+            }
+
+            try {
+                installationLatch.await();
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+            }
+
+            node.put("elapsed", System.currentTimeMillis() - addStartTime);
+            log.info("{} Filter Flow Objectives elapsed -> {} ms",
+                     totalFlowObj, (System.currentTimeMillis() - addStartTime));
+
+            if (this.remove) {
+                for (DeviceId dId : filteringObj.keySet()) {
+                    Set<FilteringObjective.Builder> fObjs = filteringObj.get(dId);
+                    for (FilteringObjective.Builder builder : fObjs) {
+                        objectiveService.filter(dId, builder.remove());
+                    }
+                }
+            }
+            return node;
+        }
+
+
+        @Override
+        public JsonNode call() throws Exception {
+            ObjectNode node = mapper.createObjectNode();
+            switch (this.typeObj) {
+                case "forward":
+                    node = installForward();
+                    break;
+                case "filter":
+                    node = installFilter();
+                    break;
+                default:
+                    log.warn("Unsupported Flow Objective Type");
+            }
+            return node;
+        }
+    }
 }
 
 
diff --git a/apps/test/demo/src/main/java/org/onosproject/demo/DemoResource.java b/apps/test/demo/src/main/java/org/onosproject/demo/DemoResource.java
index 2ecd677..5ce3ab8 100644
--- a/apps/test/demo/src/main/java/org/onosproject/demo/DemoResource.java
+++ b/apps/test/demo/src/main/java/org/onosproject/demo/DemoResource.java
@@ -56,6 +56,24 @@
     }
 
     /**
+     * Start the flow objective test.
+     *
+     * @param input JSON describing how to run the flow objective test
+     * @return response code OK
+     * @throws IOException if the JSON processing fails
+     */
+    @POST
+    @Path("flowObjTest")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response flowObjTest(InputStream input) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode cfg = mapper.readTree(input);
+        DemoApi demo = get(DemoApi.class);
+        return Response.ok(demo.flowObjTest(Optional.ofNullable(cfg)).toString()).build();
+    }
+
+    /**
      * Set up the flow test.
      *
      * @param input JSON describing how to configure the flow test
diff --git a/tools/test/bin/flow-obj-tester.py b/tools/test/bin/flow-obj-tester.py
new file mode 100644
index 0000000..76f01ad
--- /dev/null
+++ b/tools/test/bin/flow-obj-tester.py
@@ -0,0 +1,48 @@
+import concurrent.futures
+import requests, json
+from optparse import OptionParser
+
+def run(url, request):
+    data = json.dumps(request)
+    r = requests.post(url, data)
+    return r
+
+def runTasks(flowObjPerDevice, typeObj, neighbours, url, servers, doJson, remove):
+    # We can use a with statement to ensure threads are cleaned up promptly
+    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
+        # Start the load operations and mark each future with its URL
+        request = { "flowObjPerDevice" : flowObjPerDevice, "typeObj" : typeObj, "neighbours" : neighbours, "remove" : remove }
+        future_to_url = {executor.submit(run, url % (server), request) for server in servers}
+        for f in concurrent.futures.as_completed(future_to_url):
+            try:
+                response = f.result()
+                server = response.url.split('//')[1].split(':')[0]
+                if (doJson):
+                    print (json.dumps({ "server" : server, "elapsed" : response.json()['elapsed'] }))
+                else:
+                    print ("%s -> %sms" % (server, response.json()['elapsed']))
+            except Exception as exc:
+                print("Execution failed -> %s" % exc)
+
+if __name__ == "__main__":
+    parser = OptionParser()
+    parser.add_option("-u", "--url", dest="url", help="set the url for the request",
+                            default="http://%s:8181/onos/demo/intents/flowObjTest")
+    parser.add_option("-f", "--flowObj", dest="flowObj", help="Number of flow objectives to install per device",
+                            default=100, type="int")
+    parser.add_option("-n", "--neighbours", dest="neighs", help="Number of neighbours to communicate to",
+                            default=0, type="int")
+    parser.add_option("-s", "--servers", dest="servers", help="List of servers to hit",
+                            default=[], action="append")
+    parser.add_option("-r", "--remove", dest="remove", help="Whether to remove flow objectives after installation",
+                            default=True, action="store_false")
+    parser.add_option("-j", "--json", dest="doJson", help="Print results in json",
+                            default=False, action="store_true")
+    parser.add_option("-t", "--typeObj", dest="typeObj", help="Type of Objective to install",
+                            default="forward", type="string")
+
+    (options, args) = parser.parse_args()
+    if (len(options.servers) == 0):
+        options.servers.append("localhost")
+    runTasks(options.flowObj, options.typeObj, options.neighs, options.url, options.servers, options.doJson, options.remove)
+