[ONOS-6409] Create throughput test for FlowObjective subsystem
Change-Id: I04689683b150a752510dbf20c37071edb890a0b0
diff --git a/apps/test/demo/src/main/java/org/onosproject/demo/DemoApi.java b/apps/test/demo/src/main/java/org/onosproject/demo/DemoApi.java
index 16779e9..f60bdfc 100644
--- a/apps/test/demo/src/main/java/org/onosproject/demo/DemoApi.java
+++ b/apps/test/demo/src/main/java/org/onosproject/demo/DemoApi.java
@@ -35,6 +35,14 @@
JsonNode flowTest(Optional<JsonNode> params);
/**
+ * Tests flow Objective subsystem based on the parameters supplied.
+ *
+ * @param params the test parameters
+ * @return JSON representation
+ */
+ JsonNode flowObjTest(Optional<JsonNode> params);
+
+ /**
* Installs intents based on the installation type.
* @param type the installation type.
* @param runParams run params
diff --git a/apps/test/demo/src/main/java/org/onosproject/demo/DemoInstaller.java b/apps/test/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
index 4fa383a..5051785 100644
--- a/apps/test/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
+++ b/apps/test/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
@@ -17,6 +17,9 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -63,6 +66,16 @@
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.DeviceId;
import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.Constraint;
import org.onosproject.net.intent.HostToHostIntent;
@@ -112,6 +125,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowObjectiveService objectiveService;
+
private ExecutorService worker;
private ExecutorService installWorker;
@@ -123,6 +139,7 @@
private ObjectMapper mapper = new ObjectMapper();
+ private AtomicLong macIndex;
@Activate
@@ -169,6 +186,30 @@
}
@Override
+ public JsonNode flowObjTest(Optional<JsonNode> params) {
+ int flowObjPerDevice = 1000;
+ int neighbours = 0;
+ boolean remove = true;
+ String typeObj = "forward";
+ if (params.isPresent()) {
+ flowObjPerDevice = params.get().get("flowObjPerDevice").asInt();
+ neighbours = params.get().get("neighbours").asInt();
+ remove = params.get().get("remove").asBoolean();
+ typeObj = params.get().get("typeObj").asText().toString();
+ }
+
+ Future<JsonNode> future = worker.submit(new FlowObjTest(flowObjPerDevice, neighbours, remove, typeObj));
+
+ try {
+ return future.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ ObjectNode node = mapper.createObjectNode();
+ node.put("Error", e.getMessage());
+ return node;
+ }
+ }
+
+ @Override
public void setup(InstallType type, Optional<JsonNode> runParams) {
switch (type) {
case MESH:
@@ -598,6 +639,209 @@
return node;
}
}
+
+ private class FlowObjTest implements Callable<JsonNode> {
+ private final int flowObjPerDevice;
+ private final int neighbours;
+ private final boolean remove;
+ private final String typeObj;
+ Map<DeviceId, Set<ForwardingObjective.Builder>> forwardingObj = new HashMap<>();
+ Map<DeviceId, Set<FilteringObjective.Builder>> filteringObj = new HashMap<>();
+
+ public FlowObjTest(int flowObjPerDevice, int neighbours, boolean remove, String typeObj) {
+ this.flowObjPerDevice = flowObjPerDevice;
+ this.neighbours = neighbours;
+ this.remove = remove;
+ this.typeObj = typeObj;
+ prepareInstallation();
+ }
+
+ private void prepareInstallation() {
+ Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
+ instances.remove(clusterService.getLocalNode());
+ Set<NodeId> acceptableNodes = Sets.newHashSet();
+ macIndex = new AtomicLong(0);
+ if (neighbours >= instances.size()) {
+ instances.forEach(instance -> acceptableNodes.add(instance.id()));
+ } else {
+ Iterator<ControllerNode> nodes = instances.iterator();
+ for (int i = neighbours; i > 0; i--) {
+ acceptableNodes.add(nodes.next().id());
+ }
+ }
+ acceptableNodes.add(clusterService.getLocalNode().id());
+
+ Set<Device> devices = Sets.newHashSet();
+ for (Device dev : deviceService.getDevices()) {
+ if (acceptableNodes.contains(
+ mastershipService.getMasterFor(dev.id()))) {
+ devices.add(dev);
+ }
+ }
+ for (Device device : devices) {
+ switch (this.typeObj) {
+ case "forward":
+ forwardingObj.put(device.id(), createForward(flowObjPerDevice));
+ break;
+ case "filter":
+ filteringObj.put(device.id(), createFilter(flowObjPerDevice));
+ break;
+ default:
+ log.warn("Unsupported Flow Objective Type");
+ break;
+ }
+ }
+ }
+
+ /*
+ * Method to create forwarding flow objectives.
+ */
+ private Set<ForwardingObjective.Builder> createForward(int flowObjPerDevice) {
+ Set<ForwardingObjective.Builder> fObj = new HashSet<>();
+ for (int i = 0; i < flowObjPerDevice; i++) {
+ TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
+ sbuilder.matchEthSrc(MacAddress.valueOf(macIndex.incrementAndGet()));
+ sbuilder.matchInPort(PortNumber.portNumber(2));
+
+ TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
+ tbuilder.add(Instructions.createOutput(PortNumber.portNumber(3)));
+
+ ForwardingObjective.Builder fobBuilder = DefaultForwardingObjective.builder();
+ fobBuilder.withFlag(ForwardingObjective.Flag.SPECIFIC)
+ .withSelector(sbuilder.build())
+ .withTreatment(tbuilder.build())
+ .withPriority(i + 1)
+ .fromApp(appId)
+ .makePermanent();
+
+ fObj.add(fobBuilder);
+ }
+ return fObj;
+ }
+
+ /*
+ *Method to install forwarding flow objectives.
+ */
+ private ObjectNode installForward() {
+ ObjectNode node = mapper.createObjectNode();
+ long addStartTime = System.currentTimeMillis();
+ int totalFlowObj = (flowObjPerDevice * deviceService.getDeviceCount());
+ CountDownLatch installationLatch = new CountDownLatch(totalFlowObj);
+ for (DeviceId dId : forwardingObj.keySet()) {
+ Set<ForwardingObjective.Builder> fObjs = forwardingObj.get(dId);
+ for (ForwardingObjective.Builder builder : fObjs) {
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective -> {
+ installationLatch.countDown();
+ })
+ );
+ objectiveService.forward(dId, builder.add(context));
+ }
+ }
+
+ try {
+ installationLatch.await();
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
+
+ node.put("elapsed", System.currentTimeMillis() - addStartTime);
+ log.info("{} Forward Flow Objectives elapsed -> {} ms",
+ totalFlowObj, (System.currentTimeMillis() - addStartTime));
+
+ if (this.remove) {
+ for (DeviceId dId : forwardingObj.keySet()) {
+ Set<ForwardingObjective.Builder> fObjs = forwardingObj.get(dId);
+ for (ForwardingObjective.Builder builder : fObjs) {
+ objectiveService.forward(dId, builder.remove());
+ }
+ }
+ }
+ return node;
+
+ }
+
+ /*
+ * Method to create filtering flow objectives.
+ */
+ private Set<FilteringObjective.Builder> createFilter(int flowObjPerDevice) {
+ Set<FilteringObjective.Builder> filterObjSet = new HashSet<>();
+ for (int i = 0; i < flowObjPerDevice; i++) {
+ TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
+ tbuilder.add(Instructions.createOutput(PortNumber.portNumber(2)));
+
+ FilteringObjective.Builder fobBuilder = DefaultFilteringObjective.builder();
+ fobBuilder.fromApp(appId)
+ .addCondition(Criteria.matchEthType(2))
+ .withMeta(tbuilder.build())
+ .permit()
+ .withPriority(i + 1)
+ .makePermanent();
+
+ filterObjSet.add(fobBuilder);
+ }
+
+ return filterObjSet;
+ }
+
+ /*
+ * Method to install filtering flow objectives.
+ */
+ private ObjectNode installFilter() {
+ ObjectNode node = mapper.createObjectNode();
+ long addStartTime = System.currentTimeMillis();
+ int totalFlowObj = (flowObjPerDevice * deviceService.getDeviceCount());
+ CountDownLatch installationLatch = new CountDownLatch(totalFlowObj);
+ for (DeviceId dId : filteringObj.keySet()) {
+ Set<FilteringObjective.Builder> fObjs = filteringObj.get(dId);
+ for (FilteringObjective.Builder builder : fObjs) {
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective -> {
+ installationLatch.countDown();
+ })
+ );
+ objectiveService.filter(dId, builder.add(context));
+ }
+ }
+
+ try {
+ installationLatch.await();
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
+
+ node.put("elapsed", System.currentTimeMillis() - addStartTime);
+ log.info("{} Filter Flow Objectives elapsed -> {} ms",
+ totalFlowObj, (System.currentTimeMillis() - addStartTime));
+
+ if (this.remove) {
+ for (DeviceId dId : filteringObj.keySet()) {
+ Set<FilteringObjective.Builder> fObjs = filteringObj.get(dId);
+ for (FilteringObjective.Builder builder : fObjs) {
+ objectiveService.filter(dId, builder.remove());
+ }
+ }
+ }
+ return node;
+ }
+
+
+ @Override
+ public JsonNode call() throws Exception {
+ ObjectNode node = mapper.createObjectNode();
+ switch (this.typeObj) {
+ case "forward":
+ node = installForward();
+ break;
+ case "filter":
+ node = installFilter();
+ break;
+ default:
+ log.warn("Unsupported Flow Objective Type");
+ }
+ return node;
+ }
+ }
}
diff --git a/apps/test/demo/src/main/java/org/onosproject/demo/DemoResource.java b/apps/test/demo/src/main/java/org/onosproject/demo/DemoResource.java
index 2ecd677..5ce3ab8 100644
--- a/apps/test/demo/src/main/java/org/onosproject/demo/DemoResource.java
+++ b/apps/test/demo/src/main/java/org/onosproject/demo/DemoResource.java
@@ -56,6 +56,24 @@
}
/**
+ * Start the flow objective test.
+ *
+ * @param input JSON describing how to run the flow objective test
+ * @return response code OK
+ * @throws IOException if the JSON processing fails
+ */
+ @POST
+ @Path("flowObjTest")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response flowObjTest(InputStream input) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode cfg = mapper.readTree(input);
+ DemoApi demo = get(DemoApi.class);
+ return Response.ok(demo.flowObjTest(Optional.ofNullable(cfg)).toString()).build();
+ }
+
+ /**
* Set up the flow test.
*
* @param input JSON describing how to configure the flow test
diff --git a/tools/test/bin/flow-obj-tester.py b/tools/test/bin/flow-obj-tester.py
new file mode 100644
index 0000000..76f01ad
--- /dev/null
+++ b/tools/test/bin/flow-obj-tester.py
@@ -0,0 +1,48 @@
+import concurrent.futures
+import requests, json
+from optparse import OptionParser
+
+def run(url, request):
+ data = json.dumps(request)
+ r = requests.post(url, data)
+ return r
+
+def runTasks(flowObjPerDevice, typeObj, neighbours, url, servers, doJson, remove):
+ # We can use a with statement to ensure threads are cleaned up promptly
+ with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
+ # Start the load operations and mark each future with its URL
+ request = { "flowObjPerDevice" : flowObjPerDevice, "typeObj" : typeObj, "neighbours" : neighbours, "remove" : remove }
+ future_to_url = {executor.submit(run, url % (server), request) for server in servers}
+ for f in concurrent.futures.as_completed(future_to_url):
+ try:
+ response = f.result()
+ server = response.url.split('//')[1].split(':')[0]
+ if (doJson):
+ print (json.dumps({ "server" : server, "elapsed" : response.json()['elapsed'] }))
+ else:
+ print ("%s -> %sms" % (server, response.json()['elapsed']))
+ except Exception as exc:
+ print("Execution failed -> %s" % exc)
+
+if __name__ == "__main__":
+ parser = OptionParser()
+ parser.add_option("-u", "--url", dest="url", help="set the url for the request",
+ default="http://%s:8181/onos/demo/intents/flowObjTest")
+ parser.add_option("-f", "--flowObj", dest="flowObj", help="Number of flow objectives to install per device",
+ default=100, type="int")
+ parser.add_option("-n", "--neighbours", dest="neighs", help="Number of neighbours to communicate to",
+ default=0, type="int")
+ parser.add_option("-s", "--servers", dest="servers", help="List of servers to hit",
+ default=[], action="append")
+ parser.add_option("-r", "--remove", dest="remove", help="Whether to remove flow objectives after installation",
+ default=True, action="store_false")
+ parser.add_option("-j", "--json", dest="doJson", help="Print results in json",
+ default=False, action="store_true")
+ parser.add_option("-t", "--typeObj", dest="typeObj", help="Type of Objective to install",
+ default="forward", type="string")
+
+ (options, args) = parser.parse_args()
+ if (len(options.servers) == 0):
+ options.servers.append("localhost")
+ runTasks(options.flowObj, options.typeObj, options.neighs, options.url, options.servers, options.doJson, options.remove)
+