random demo intent installer
Change-Id: I1ac2f5a6b7efbc7c940c6d7c371af4f5befcd676
using a timer to pace ourselves
Change-Id: Ia58698f950bbbc958ad002ed56dfe54b90f317ab
all good blasts intents
Change-Id: Ia85df5ad211c01d22d4088403d789b3d6a2292f7
clean up
Change-Id: I1a6dde05f57d0e4866d3255fc28836dfa7e7c190
diff --git a/apps/demo/pom.xml b/apps/demo/pom.xml
index fbc7f91..fe2d916 100644
--- a/apps/demo/pom.xml
+++ b/apps/demo/pom.xml
@@ -109,7 +109,9 @@
- org.onlab.onos.*
+ org.onlab.onos.*,
+ org.onlab.util.*,
+ org.jboss.netty.util.*
diff --git a/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java b/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java
index 2a267f3..ff877da 100644
--- a/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java
+++ b/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java
@@ -1,5 +1,9 @@
package org.onlab.onos.demo;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Optional;
* Simple demo api interface.
@@ -11,7 +15,7 @@
* Installs intents based on the installation type.
* @param type the installation type.
- void setup(InstallType type);
+ void setup(InstallType type, Optional<JsonNode> runParams);
* Uninstalls all existing intents.
diff --git a/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java b/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
index 758f628..de66ee5 100644
--- a/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
+++ b/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
@@ -15,7 +15,11 @@
package org.onlab.onos.demo;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -23,9 +27,14 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
+import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.Host;
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficSelector;
@@ -34,15 +43,27 @@
import org.onlab.onos.net.intent.Constraint;
import org.onlab.onos.net.intent.HostToHostIntent;
import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentBatchService;
+import org.onlab.onos.net.intent.IntentOperations;
import org.onlab.onos.net.intent.IntentService;
import org.slf4j.Logger;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static org.slf4j.LoggerFactory.getLogger;
@@ -64,17 +85,31 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected IntentBatchService intentBatchService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
private ExecutorService worker;
+ private ExecutorService randomWorker;
private ApplicationId appId;
private final Set<Intent> existingIntents = new HashSet<>();
+ private RandomInstaller randomInstaller;
public void activate() {
- appId = coreService.registerApplication("org.onlab.onos.demo.installer");
+ String nodeId = clusterService.getLocalNode().ip().toString();
+ appId = coreService.registerApplication("org.onlab.onos.demo.installer."
+ + nodeId);
worker = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder()
@@ -84,19 +119,34 @@
public void deactivate() {
- worker.shutdownNow();
+ shutdownAndAwaitTermination(worker);
+ if (!randomWorker.isShutdown()) {
+ shutdownAndAwaitTermination(randomWorker);
+ }
- public void setup(InstallType type) {
+ public void setup(InstallType type, Optional<JsonNode> runParams) {
switch (type) {
case MESH:
log.debug("Installing mesh intents");
worker.execute(new MeshInstaller());
case RANDOM:
- throw new IllegalArgumentException("Not yet implemented.");
+ //check that we do not have a random installer running
+ if (randomWorker == null || randomWorker.isShutdown()) {
+ randomWorker = Executors.newFixedThreadPool(1,
+ new ThreadFactoryBuilder()
+ .setNameFormat("random-worker")
+ .build());
+ log.debug("Installing random sequence of intents");
+ randomInstaller = new RandomInstaller(runParams);
+ randomWorker.execute(randomInstaller);
+ } else {
+ log.warn("Random installer is already running");
+ }
+ break;
throw new IllegalArgumentException("What is it you want exactly?");
@@ -108,6 +158,9 @@
+ /**
+ * Simply installs a mesh of intents from all the hosts existing in the network.
+ */
private class MeshInstaller implements Runnable {
@@ -129,15 +182,295 @@
+ /**
+ * Randomly installs and withdraws intents.
+ */
+ private class RandomInstaller implements Runnable {
+ private final boolean isLocal;
+ private final Set<Host> hosts;
+ private final Random random = new Random(System.currentTimeMillis());
+ private Set<HostPair> uninstalledOrWithdrawn;
+ private Set<HostPair> installed;
+ private CountDownLatch latch;
+ //used to wait on a batch to be processed.
+ private static final int ITERATIONMAX = 50000000;
+ public RandomInstaller(Optional<JsonNode> runParams) {
+ /*
+ Check if we have params and honour them. Otherwise
+ set defaults to processing only local stuff and
+ all local hosts.
+ */
+ if (runParams.isPresent()) {
+ JsonNode node = runParams.get();
+ isLocal = node.get("local").asBoolean();
+ hosts = node.get("hosts") == null ? Sets.newHashSet(hostService.getHosts()) :
+ constructHostIds(node.get("hosts").elements());
+ } else {
+ isLocal = true;
+ hosts = Sets.newHashSet(hostService.getHosts());
+ }
+ //construct list of intents.
+ installed = Sets.newHashSet();
+ if (isLocal) {
+ uninstalledOrWithdrawn = buildPairs(pruneHostsByMasterShip());
+ } else {
+ uninstalledOrWithdrawn = buildPairs(hosts);
+ }
+ }
+ private Set<Host> constructHostIds(Iterator<JsonNode> elements) {
+ Set<Host> hostIds = Sets.newHashSet();
+ JsonNode n;
+ while (elements.hasNext()) {
+ n = elements.next();
+ hostIds.add(hostService.getHost(HostId.hostId(n.textValue())));
+ }
+ return hostIds;
+ }
+ @Override
+ public void run() {
+ if (!randomWorker.isShutdown()) {
+ randomize();
+ latch = new CountDownLatch(1);
+ try {
+ trackIntents();
+ } catch (InterruptedException e) {
+ shutdown();
+ }
+ }
+ }
+ /**
+ * Check whether the previously submitted batch is in progress
+ * and if yes submit the next one. If things hang, wait for at
+ * most 5 seconds and bail.
+ * @throws InterruptedException if the thread go interupted
+ */
+ private void trackIntents() throws InterruptedException {
+ int count = 0;
+ while (!latch.await(100, TimeUnit.NANOSECONDS)) {
+ if (intentBatchService.getPendingOperations().isEmpty()) {
+ latch.countDown();
+ }
+ count++;
+ if (count > ITERATIONMAX) {
+ log.warn("A batch is stuck processing. current : {}" +
+ ", pending : {}",
+ intentBatchService.getCurrentOperations(),
+ intentBatchService.getPendingOperations());
+ shutdownAndAwaitTermination(randomWorker);
+ }
+ }
+ //if everyting is good proceed.
+ if (!randomWorker.isShutdown()) {
+ randomWorker.execute(this);
+ }
+ }
+ public void shutdown() {
+ log.warn("Shutting down random installer!");
+ cleanUp();
+ }
+ /**
+ * Shuffle the uninstalled and installed list (separately) and select
+ * a random number of them and install or uninstall them respectively.
+ */
+ private void randomize() {
+ List<HostPair> hostList = new LinkedList<>(uninstalledOrWithdrawn);
+ Collections.shuffle(hostList);
+ List<HostPair> toInstall = hostList.subList(0,
+ random.nextInt(hostList.size() - 1));
+ List<HostPair> toRemove;
+ if (!installed.isEmpty()) {
+ hostList = new LinkedList<>(installed);
+ Collections.shuffle(hostList);
+ toRemove = hostList.subList(0,
+ random.nextInt(hostList.size() - 1));
+ uninstallIntents(toRemove);
+ }
+ installIntents(toInstall);
+ }
+ private void installIntents(List<HostPair> toInstall) {
+ IntentOperations.Builder builder = IntentOperations.builder();
+ for (HostPair pair : toInstall) {
+ installed.add(pair);
+ uninstalledOrWithdrawn.remove(pair);
+ builder.addSubmitOperation(pair.h2hIntent());
+ }
+ intentBatchService.addIntentOperations(builder.build());
+ }
+ private void uninstallIntents(Collection<HostPair> toRemove) {
+ IntentOperations.Builder builder = IntentOperations.builder();
+ for (HostPair pair : toRemove) {
+ installed.remove(pair);
+ uninstalledOrWithdrawn.add(pair);
+ builder.addWithdrawOperation(pair.h2hIntent().id());
+ }
+ intentBatchService.addIntentOperations(builder.build());
+ }
+ /**
+ * Take everything and remove it all.
+ */
+ private void cleanUp() {
+ List<HostPair> allPairs = Lists.newArrayList(installed);
+ allPairs.addAll(uninstalledOrWithdrawn);
+ IntentOperations.Builder builder = IntentOperations.builder();
+ for (HostPair pair : allPairs) {
+ builder.addWithdrawOperation(pair.h2hIntent().id());
+ }
+ intentBatchService.addIntentOperations(builder.build());
+ }
+ private Set<HostPair> buildPairs(Set<Host> hosts) {
+ Set<HostPair> pairs = Sets.newHashSet();
+ Iterator<Host> it = Sets.newHashSet(hosts).iterator();
+ while (it.hasNext()) {
+ Host src = it.next();
+ it.remove();
+ for (Host dst : hosts) {
+ pairs.add(new HostPair(src, dst));
+ }
+ }
+ return pairs;
+ }
+ private Set<Host> pruneHostsByMasterShip() {
+ return FluentIterable.from(hosts)
+ .filter(hasLocalMaster())
+ .toSet();
+ }
+ private Predicate<? super Host> hasLocalMaster() {
+ return new Predicate<Host>() {
+ @Override
+ public boolean apply(Host host) {
+ return mastershipService.getLocalRole(
+ host.location().deviceId()).equals(MastershipRole.MASTER);
+ }
+ };
+ }
+ /**
+ * Simple class representing a pair of hosts and precomputes the associated
+ * h2h intent.
+ */
+ private class HostPair {
+ private final Host src;
+ private final Host dst;
+ private final TrafficSelector selector = DefaultTrafficSelector.builder().build();
+ private final TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
+ private final List<Constraint> constraint = Lists.newArrayList();
+ private final HostToHostIntent intent;
+ public HostPair(Host src, Host dst) {
+ this.src = src;
+ this.dst = dst;
+ this.intent = new HostToHostIntent(appId, src.id(), dst.id(),
+ selector, treatment, constraint);
+ }
+ public HostToHostIntent h2hIntent() {
+ return intent;
+ }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HostPair hostPair = (HostPair) o;
+ return Objects.equals(src, hostPair.src) &&
+ Objects.equals(dst, hostPair.dst);
+ }
+ @Override
+ public int hashCode() {
+ return Objects.hash(src, dst);
+ }
+ }
+ }
+ /**
+ * Remove anything that is running and clear it all out.
+ */
private class UnInstaller implements Runnable {
public void run() {
+ if (!existingIntents.isEmpty()) {
+ clearExistingIntents();
+ }
+ if (randomWorker != null && !randomWorker.isShutdown()) {
+ shutdownAndAwaitTermination(randomWorker);
+ randomInstaller.shutdown();
+ }
+ }
+ private void clearExistingIntents() {
for (Intent i : existingIntents) {
+ existingIntents.clear();
+ /**
+ * Shutdown a pool cleanly if possible.
+ *
+ * @param pool an executorService
+ */
+ private void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.error("Pool did not terminate");
+ }
+ }
+ } catch (Exception ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
diff --git a/apps/demo/src/main/java/org/onlab/onos/demo/DemoResource.java b/apps/demo/src/main/java/org/onlab/onos/demo/DemoResource.java
index f533072..91e8b62 100644
--- a/apps/demo/src/main/java/org/onlab/onos/demo/DemoResource.java
+++ b/apps/demo/src/main/java/org/onlab/onos/demo/DemoResource.java
@@ -13,6 +13,7 @@
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Optional;
* Rest API for demos.
@@ -33,10 +34,11 @@
.entity("Expected type field containing either mesh or random.").build();
DemoAPI.InstallType type = DemoAPI.InstallType.valueOf(
DemoAPI demo = get(DemoAPI.class);
- demo.setup(type);
+ demo.setup(type, Optional.ofNullable(cfg.get("runParams")));
return Response.ok(mapper.createObjectNode().toString()).build();
diff --git a/features/features.xml b/features/features.xml
index 2a5e9cb..84419ed 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -231,6 +231,7 @@
<feature name="onos-app-demo" version="1.0.0"
description="ONOS demo applications">
+ <bundle>mvn:org.onlab.onos/onlab-misc/1.0.0-SNAPSHOT</bundle>