random demo intent installer

Change-Id: I1ac2f5a6b7efbc7c940c6d7c371af4f5befcd676

using a timer to pace ourselves

Change-Id: Ia58698f950bbbc958ad002ed56dfe54b90f317ab

all good blasts intents

Change-Id: Ia85df5ad211c01d22d4088403d789b3d6a2292f7

clean up

Change-Id: I1a6dde05f57d0e4866d3255fc28836dfa7e7c190
diff --git a/apps/demo/pom.xml b/apps/demo/pom.xml
index fbc7f91..fe2d916 100644
--- a/apps/demo/pom.xml
+++ b/apps/demo/pom.xml
@@ -109,7 +109,9 @@
                             com.google.common.*,
                             org.onlab.packet.*,
                             org.onlab.rest.*,
-                            org.onlab.onos.*
+                            org.onlab.onos.*,
+                            org.onlab.util.*,
+                            org.jboss.netty.util.*
                         </Import-Package>
                         <Web-ContextPath>${web.context}</Web-ContextPath>
                     </instructions>
diff --git a/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java b/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java
index 2a267f3..ff877da 100644
--- a/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java
+++ b/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java
@@ -1,5 +1,9 @@
 package org.onlab.onos.demo;
 
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.util.Optional;
+
 /**
  * Simple demo api interface.
  */
@@ -11,7 +15,7 @@
      * Installs intents based on the installation type.
      * @param type the installation type.
      */
-    void setup(InstallType type);
+    void setup(InstallType type, Optional<JsonNode> runParams);
 
     /**
      * Uninstalls all existing intents.
diff --git a/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java b/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
index 758f628..de66ee5 100644
--- a/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
+++ b/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
@@ -15,7 +15,11 @@
  */
 package org.onlab.onos.demo;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -23,9 +27,14 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+
+import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.core.CoreService;
+import org.onlab.onos.mastership.MastershipService;
 import org.onlab.onos.net.Host;
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.MastershipRole;
 import org.onlab.onos.net.flow.DefaultTrafficSelector;
 import org.onlab.onos.net.flow.DefaultTrafficTreatment;
 import org.onlab.onos.net.flow.TrafficSelector;
@@ -34,15 +43,27 @@
 import org.onlab.onos.net.intent.Constraint;
 import org.onlab.onos.net.intent.HostToHostIntent;
 import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentBatchService;
+import org.onlab.onos.net.intent.IntentOperations;
 import org.onlab.onos.net.intent.IntentService;
 import org.slf4j.Logger;
 
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -64,17 +85,31 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected HostService hostService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentBatchService intentBatchService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
     private ExecutorService worker;
 
+    private ExecutorService randomWorker;
+
     private ApplicationId appId;
 
     private final Set<Intent> existingIntents = new HashSet<>();
+    private RandomInstaller randomInstaller;
 
 
 
     @Activate
     public void activate() {
-        appId = coreService.registerApplication("org.onlab.onos.demo.installer");
+        String nodeId = clusterService.getLocalNode().ip().toString();
+        appId = coreService.registerApplication("org.onlab.onos.demo.installer."
+                                                        + nodeId);
         worker = Executors.newFixedThreadPool(1,
                                               new ThreadFactoryBuilder()
                                                       .setNameFormat("demo-app-worker")
@@ -84,19 +119,34 @@
 
     @Deactivate
     public void deactivate() {
-        worker.shutdownNow();
+        shutdownAndAwaitTermination(worker);
+        if (!randomWorker.isShutdown()) {
+            shutdownAndAwaitTermination(randomWorker);
+        }
         log.info("Stopped");
     }
 
     @Override
-    public void setup(InstallType type) {
+    public void setup(InstallType type, Optional<JsonNode> runParams) {
         switch (type) {
             case MESH:
                 log.debug("Installing mesh intents");
                 worker.execute(new MeshInstaller());
                 break;
             case RANDOM:
-                throw new IllegalArgumentException("Not yet implemented.");
+                //check that we do not have a random installer running
+                if (randomWorker == null || randomWorker.isShutdown()) {
+                    randomWorker = Executors.newFixedThreadPool(1,
+                                                   new ThreadFactoryBuilder()
+                                                           .setNameFormat("random-worker")
+                                                           .build());
+                    log.debug("Installing random sequence of intents");
+                    randomInstaller = new RandomInstaller(runParams);
+                    randomWorker.execute(randomInstaller);
+                } else {
+                    log.warn("Random installer is already running");
+                }
+                break;
             default:
                 throw new IllegalArgumentException("What is it you want exactly?");
         }
@@ -108,6 +158,9 @@
     }
 
 
+    /**
+     * Simply installs a mesh of intents from all the hosts existing in the network.
+     */
     private class MeshInstaller implements Runnable {
 
         @Override
@@ -129,15 +182,295 @@
         }
     }
 
+    /**
+     * Randomly installs and withdraws intents.
+     */
+    private class RandomInstaller implements Runnable {
 
+        private final boolean isLocal;
+        private final Set<Host> hosts;
+
+        private final Random random = new Random(System.currentTimeMillis());
+
+        private Set<HostPair> uninstalledOrWithdrawn;
+        private Set<HostPair> installed;
+
+        private CountDownLatch latch;
+
+        //used to wait on a batch to be processed.
+        private static final int ITERATIONMAX = 50000000;
+
+
+        public RandomInstaller(Optional<JsonNode> runParams) {
+            /*
+                Check if we have params and honour them. Otherwise
+                    set defaults to processing only local stuff and
+                    all local hosts.
+             */
+            if (runParams.isPresent()) {
+                JsonNode node = runParams.get();
+                isLocal = node.get("local").asBoolean();
+                hosts = node.get("hosts") == null ? Sets.newHashSet(hostService.getHosts()) :
+                        constructHostIds(node.get("hosts").elements());
+            } else {
+                isLocal = true;
+                hosts = Sets.newHashSet(hostService.getHosts());
+            }
+
+            //construct list of intents.
+            installed = Sets.newHashSet();
+            if (isLocal) {
+                uninstalledOrWithdrawn = buildPairs(pruneHostsByMasterShip());
+            } else {
+                uninstalledOrWithdrawn = buildPairs(hosts);
+            }
+
+        }
+
+        private Set<Host> constructHostIds(Iterator<JsonNode> elements) {
+            Set<Host> hostIds = Sets.newHashSet();
+            JsonNode n;
+            while (elements.hasNext()) {
+                n = elements.next();
+                hostIds.add(hostService.getHost(HostId.hostId(n.textValue())));
+            }
+            return hostIds;
+        }
+
+        @Override
+        public void run() {
+            if (!randomWorker.isShutdown()) {
+                randomize();
+                latch = new CountDownLatch(1);
+                try {
+                    trackIntents();
+                } catch (InterruptedException e) {
+                    shutdown();
+                }
+            }
+
+        }
+
+
+        /**
+         *   Check whether the previously submitted batch is in progress
+         *   and if yes submit the next one. If things hang, wait for at
+         *   most 5 seconds and bail.
+         * @throws InterruptedException if the thread go interupted
+         */
+        private void trackIntents() throws InterruptedException {
+            int count = 0;
+            while (!latch.await(100, TimeUnit.NANOSECONDS)) {
+                if (intentBatchService.getPendingOperations().isEmpty()) {
+                    latch.countDown();
+                }
+                count++;
+                if (count > ITERATIONMAX) {
+                    log.warn("A batch is stuck processing. current : {}" +
+                                     ", pending : {}",
+                             intentBatchService.getCurrentOperations(),
+                             intentBatchService.getPendingOperations());
+                    shutdownAndAwaitTermination(randomWorker);
+                }
+            }
+            //if everyting is good proceed.
+            if (!randomWorker.isShutdown()) {
+                randomWorker.execute(this);
+            }
+
+        }
+
+        public void shutdown() {
+            log.warn("Shutting down random installer!");
+            cleanUp();
+        }
+
+
+        /**
+         *   Shuffle the uninstalled and installed list (separately) and select
+         *   a random number of them and install or uninstall them respectively.
+         */
+        private void randomize() {
+            List<HostPair> hostList = new LinkedList<>(uninstalledOrWithdrawn);
+            Collections.shuffle(hostList);
+            List<HostPair> toInstall = hostList.subList(0,
+                                                        random.nextInt(hostList.size() - 1));
+            List<HostPair> toRemove;
+            if (!installed.isEmpty()) {
+                hostList = new LinkedList<>(installed);
+                Collections.shuffle(hostList);
+                toRemove = hostList.subList(0,
+                                            random.nextInt(hostList.size() - 1));
+                uninstallIntents(toRemove);
+            }
+            installIntents(toInstall);
+
+        }
+
+        private void installIntents(List<HostPair> toInstall) {
+            IntentOperations.Builder builder = IntentOperations.builder();
+            for (HostPair pair : toInstall) {
+                installed.add(pair);
+                uninstalledOrWithdrawn.remove(pair);
+                builder.addSubmitOperation(pair.h2hIntent());
+            }
+            intentBatchService.addIntentOperations(builder.build());
+        }
+
+        private void uninstallIntents(Collection<HostPair> toRemove) {
+            IntentOperations.Builder builder = IntentOperations.builder();
+            for (HostPair pair : toRemove) {
+                installed.remove(pair);
+                uninstalledOrWithdrawn.add(pair);
+                builder.addWithdrawOperation(pair.h2hIntent().id());
+            }
+            intentBatchService.addIntentOperations(builder.build());
+        }
+
+        /**
+         *  Take everything and remove it all.
+         */
+        private void cleanUp() {
+            List<HostPair> allPairs = Lists.newArrayList(installed);
+            allPairs.addAll(uninstalledOrWithdrawn);
+            IntentOperations.Builder builder = IntentOperations.builder();
+            for (HostPair pair : allPairs) {
+                builder.addWithdrawOperation(pair.h2hIntent().id());
+            }
+            intentBatchService.addIntentOperations(builder.build());
+        }
+
+
+        private Set<HostPair> buildPairs(Set<Host> hosts) {
+            Set<HostPair> pairs = Sets.newHashSet();
+            Iterator<Host> it = Sets.newHashSet(hosts).iterator();
+            while (it.hasNext()) {
+                Host src = it.next();
+                it.remove();
+                for (Host dst : hosts) {
+                    pairs.add(new HostPair(src, dst));
+                }
+            }
+            return pairs;
+        }
+
+        private Set<Host> pruneHostsByMasterShip() {
+            return FluentIterable.from(hosts)
+                    .filter(hasLocalMaster())
+                    .toSet();
+
+        }
+
+        private Predicate<? super Host> hasLocalMaster() {
+            return new Predicate<Host>() {
+                @Override
+                public boolean apply(Host host) {
+                    return mastershipService.getLocalRole(
+                            host.location().deviceId()).equals(MastershipRole.MASTER);
+                }
+            };
+        }
+
+
+        /**
+         * Simple class representing a pair of hosts and precomputes the associated
+         * h2h intent.
+         */
+        private class HostPair {
+
+            private final Host src;
+            private final Host dst;
+
+            private final TrafficSelector selector = DefaultTrafficSelector.builder().build();
+            private final TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
+            private final List<Constraint> constraint = Lists.newArrayList();
+            private final HostToHostIntent intent;
+
+            public HostPair(Host src, Host dst) {
+                this.src = src;
+                this.dst = dst;
+                this.intent = new HostToHostIntent(appId, src.id(), dst.id(),
+                                                   selector, treatment, constraint);
+            }
+
+            public HostToHostIntent h2hIntent() {
+                return intent;
+            }
+
+            @Override
+            public boolean equals(Object o) {
+                if (this == o) {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass()) {
+                    return false;
+                }
+
+                HostPair hostPair = (HostPair) o;
+
+                return Objects.equals(src, hostPair.src) &&
+                        Objects.equals(dst, hostPair.dst);
+
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(src, dst);
+            }
+
+
+        }
+
+    }
+
+    /**
+     * Remove anything that is running and clear it all out.
+     */
     private class UnInstaller implements Runnable {
         @Override
         public void run() {
+            if (!existingIntents.isEmpty()) {
+                clearExistingIntents();
+            }
+
+            if (randomWorker != null && !randomWorker.isShutdown()) {
+                shutdownAndAwaitTermination(randomWorker);
+                randomInstaller.shutdown();
+            }
+        }
+
+        private void clearExistingIntents() {
             for (Intent i : existingIntents) {
                 intentService.withdraw(i);
             }
+            existingIntents.clear();
         }
     }
+
+    /**
+     * Shutdown a pool cleanly if possible.
+     *
+     * @param pool an executorService
+     */
+    private void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                    log.error("Pool did not terminate");
+                }
+            }
+        } catch (Exception ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
 }
 
 
diff --git a/apps/demo/src/main/java/org/onlab/onos/demo/DemoResource.java b/apps/demo/src/main/java/org/onlab/onos/demo/DemoResource.java
index f533072..91e8b62 100644
--- a/apps/demo/src/main/java/org/onlab/onos/demo/DemoResource.java
+++ b/apps/demo/src/main/java/org/onlab/onos/demo/DemoResource.java
@@ -13,6 +13,7 @@
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Optional;
 
 /**
  * Rest API for demos.
@@ -33,10 +34,11 @@
                     .entity("Expected type field containing either mesh or random.").build();
         }
 
+
         DemoAPI.InstallType type = DemoAPI.InstallType.valueOf(
                 cfg.get("type").asText().toUpperCase());
         DemoAPI demo = get(DemoAPI.class);
-        demo.setup(type);
+        demo.setup(type, Optional.ofNullable(cfg.get("runParams")));
 
         return Response.ok(mapper.createObjectNode().toString()).build();
     }
diff --git a/features/features.xml b/features/features.xml
index 2a5e9cb..84419ed 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -231,6 +231,7 @@
     <feature name="onos-app-demo" version="1.0.0"
              description="ONOS demo applications">
         <feature>onos-api</feature>
+        <bundle>mvn:org.onlab.onos/onlab-misc/1.0.0-SNAPSHOT</bundle>
         <bundle>mvn:org.onlab.onos/onos-app-demo/1.0.0-SNAPSHOT</bundle>
     </feature>