Added test code to test the flow subsystem performance
Change-Id: I1a7c68492760a63b7d092c3ca71e4964123c8aa7
diff --git a/apps/demo/pom.xml b/apps/demo/pom.xml
index e78ad3b..e66ccc2 100644
--- a/apps/demo/pom.xml
+++ b/apps/demo/pom.xml
@@ -106,6 +106,7 @@
com.sun.jersey.server.impl.container.servlet,
com.fasterxml.jackson.databind,
com.fasterxml.jackson.databind.node,
+ org.apache.commons.lang.math.*,
com.google.common.*,
org.onlab.packet.*,
org.onlab.rest.*,
diff --git a/apps/demo/src/main/java/org/onosproject/demo/DemoAPI.java b/apps/demo/src/main/java/org/onosproject/demo/DemoAPI.java
index e410b02..5812eeb 100644
--- a/apps/demo/src/main/java/org/onosproject/demo/DemoAPI.java
+++ b/apps/demo/src/main/java/org/onosproject/demo/DemoAPI.java
@@ -27,6 +27,12 @@
enum InstallType { MESH, RANDOM };
/**
+ * Tests flow subsystem based on the parameters supplied.
+ * @param params the test parameters
+ */
+ JsonNode flowTest(Optional<JsonNode> params);
+
+ /**
* Installs intents based on the installation type.
* @param type the installation type.
* @param runParams run params
diff --git a/apps/demo/src/main/java/org/onosproject/demo/DemoInstaller.java b/apps/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
index fe32eef..10ac5a3 100644
--- a/apps/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
+++ b/apps/demo/src/main/java/org/onosproject/demo/DemoInstaller.java
@@ -16,27 +16,40 @@
package org.onosproject.demo;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-
+import org.onlab.packet.MacAddress;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
import org.onosproject.net.Host;
import org.onosproject.net.HostId;
import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.host.HostService;
@@ -48,7 +61,6 @@
import org.onosproject.net.intent.IntentService;
import org.slf4j.Logger;
-
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -59,11 +71,14 @@
import java.util.Optional;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.TimeoutException;
import static org.slf4j.LoggerFactory.getLogger;
@@ -94,15 +109,23 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowRuleService flowService;
+
private ExecutorService worker;
- private ExecutorService randomWorker;
+ private ExecutorService installWorker;
private ApplicationId appId;
private final Set<Intent> existingIntents = new HashSet<>();
private RandomInstaller randomInstaller;
+ private ObjectMapper mapper = new ObjectMapper();
+
@Activate
@@ -120,13 +143,33 @@
@Deactivate
public void deactivate() {
shutdownAndAwaitTermination(worker);
- if (!randomWorker.isShutdown()) {
- shutdownAndAwaitTermination(randomWorker);
+ if (installWorker != null && !installWorker.isShutdown()) {
+ shutdownAndAwaitTermination(installWorker);
}
log.info("Stopped");
}
@Override
+ public JsonNode flowTest(Optional<JsonNode> params) {
+ int flowsPerDevice = 1000;
+ int neighbours = 0;
+ if (params.isPresent()) {
+ flowsPerDevice = params.get().get("flowsPerDevice").asInt();
+ neighbours = params.get().get("neighbours").asInt();
+ }
+
+ Future<JsonNode> future = worker.submit(new FlowTest(flowsPerDevice, neighbours));
+
+ try {
+ return future.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ ObjectNode node = mapper.createObjectNode();
+ node.put("Error", e.getMessage());
+ return node;
+ }
+ }
+
+ @Override
public void setup(InstallType type, Optional<JsonNode> runParams) {
switch (type) {
case MESH:
@@ -135,14 +178,14 @@
break;
case RANDOM:
//check that we do not have a random installer running
- if (randomWorker == null || randomWorker.isShutdown()) {
- randomWorker = Executors.newFixedThreadPool(1,
+ if (installWorker == null || installWorker.isShutdown()) {
+ installWorker = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("random-worker")
.build());
log.debug("Installing random sequence of intents");
randomInstaller = new RandomInstaller(runParams);
- randomWorker.execute(randomInstaller);
+ installWorker.execute(randomInstaller);
} else {
log.warn("Random installer is already running");
}
@@ -239,7 +282,7 @@
@Override
public void run() {
- if (!randomWorker.isShutdown()) {
+ if (!installWorker.isShutdown()) {
randomize();
latch = new CountDownLatch(1);
try {
@@ -269,12 +312,12 @@
log.warn("A batch is stuck processing. " +
"pending : {}",
intentBatchService.getPendingOperations());
- shutdownAndAwaitTermination(randomWorker);
+ shutdownAndAwaitTermination(installWorker);
}
}
//if everyting is good proceed.
- if (!randomWorker.isShutdown()) {
- randomWorker.execute(this);
+ if (!installWorker.isShutdown()) {
+ installWorker.execute(this);
}
}
@@ -432,8 +475,8 @@
clearExistingIntents();
}
- if (randomWorker != null && !randomWorker.isShutdown()) {
- shutdownAndAwaitTermination(randomWorker);
+ if (installWorker != null && !installWorker.isShutdown()) {
+ shutdownAndAwaitTermination(installWorker);
randomInstaller.shutdown();
}
}
@@ -470,6 +513,91 @@
}
}
+ private class FlowTest implements Callable<JsonNode> {
+ private final int flowPerDevice;
+ private final int neighbours;
+ private FlowRuleOperations.Builder adds;
+ private FlowRuleOperations.Builder removes;
+
+ public FlowTest(int flowsPerDevice, int neighbours) {
+ this.flowPerDevice = flowsPerDevice;
+ this.neighbours = neighbours;
+ prepareInstallation();
+ }
+
+ private void prepareInstallation() {
+ Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
+ instances.remove(clusterService.getLocalNode());
+ Set<NodeId> acceptableNodes = Sets.newHashSet();
+ if (neighbours >= instances.size()) {
+ instances.forEach(instance -> acceptableNodes.add(instance.id()));
+ } else {
+ Iterator<ControllerNode> nodes = instances.iterator();
+ for (int i = neighbours; i > 0; i--) {
+ acceptableNodes.add(nodes.next().id());
+ }
+ }
+ acceptableNodes.add(clusterService.getLocalNode().id());
+
+ Set<Device> devices = Sets.newHashSet();
+ for (Device dev : deviceService.getDevices()) {
+ if (acceptableNodes.contains(
+ mastershipService.getMasterFor(dev.id()))) {
+ devices.add(dev);
+ }
+ }
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
+ TrafficSelector.Builder sbuilder;
+ FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
+ FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
+
+ for (Device d : devices) {
+ for (int i = 0; i < this.flowPerDevice; i++) {
+ sbuilder = DefaultTrafficSelector.builder();
+
+ sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
+ .matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
+
+
+ int randomPriority = RandomUtils.nextInt();
+ DefaultFlowRule f = new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
+ randomPriority, appId, 10, false);
+ rules.add(f);
+ remove.remove(f);
+
+ }
+ }
+
+ this.adds = rules;
+ this.removes = remove;
+ }
+
+ @Override
+ public JsonNode call() throws Exception {
+ ObjectNode node = mapper.createObjectNode();
+ CountDownLatch latch = new CountDownLatch(1);
+ flowService.apply(adds.build(new FlowRuleOperationsContext() {
+
+ private final Stopwatch timer = Stopwatch.createStarted();
+
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+
+ long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
+ node.put("elapsed", elapsed);
+
+
+ latch.countDown();
+ }
+ }));
+
+ latch.await(10, TimeUnit.SECONDS);
+ flowService.apply(removes.build());
+ return node;
+ }
+ }
}
diff --git a/apps/demo/src/main/java/org/onosproject/demo/DemoResource.java b/apps/demo/src/main/java/org/onosproject/demo/DemoResource.java
index 3f8a3c5..29c55d1 100644
--- a/apps/demo/src/main/java/org/onosproject/demo/DemoResource.java
+++ b/apps/demo/src/main/java/org/onosproject/demo/DemoResource.java
@@ -37,6 +37,18 @@
public class DemoResource extends BaseResource {
+
+ @POST
+ @Path("flowTest")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response flowTest(InputStream input) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode cfg = mapper.readTree(input);
+ DemoAPI demo = get(DemoAPI.class);
+ return Response.ok(demo.flowTest(Optional.ofNullable(cfg)).toString()).build();
+ }
+
@POST
@Path("setup")
@Consumes(MediaType.APPLICATION_JSON)