started work to parrallize compliation and "installation"

Change-Id: I2e7f03b9f8074ef6f9e1c186009ed3cad6980b49
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
index 87e786c..55892c3 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
@@ -82,6 +82,7 @@
      */
     @Deprecated
     List<Operation> batchWrite(BatchWrite batch);
+
     default void write(IntentData newData) {}
     default void batchWrite(Iterable<IntentData> updates) {}
 
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 05efb22..6551c85 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -66,6 +67,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.namedThreads;
 import static org.onosproject.net.intent.IntentState.FAILED;
 import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
@@ -115,7 +117,8 @@
     protected FlowRuleService flowRuleService;
 
 
-    private ExecutorService executor;
+    private ExecutorService batchExecutor;
+    private ExecutorService workerExecutor;
 
     private final IntentStoreDelegate delegate = new InternalStoreDelegate();
     private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@@ -129,7 +132,8 @@
         store.setDelegate(delegate);
         trackerService.setDelegate(topoDelegate);
         eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
-        executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-%d"));
+        batchExecutor = newSingleThreadExecutor(namedThreads("onos-intent-batch"));
+        workerExecutor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-worker-%d"));
         idGenerator = coreService.getIdGenerator("intent-ids");
         Intent.bindIdGenerator(idGenerator);
         log.info("Started");
@@ -140,7 +144,7 @@
         store.unsetDelegate(delegate);
         trackerService.unsetDelegate(topoDelegate);
         eventDispatcher.removeSink(IntentEvent.class);
-        executor.shutdown();
+        batchExecutor.shutdown();
         Intent.unbindIdGenerator(idGenerator);
         log.info("Stopped");
     }
@@ -472,9 +476,15 @@
         @Override
         public void run() {
             try {
-                List<IntentUpdate> updates = createIntentUpdates();
+                // 1. wrap each intentdata in a runnable and submit
+                List<Future<IntentUpdate>> updates = createIntentUpdates();
+                // TODO
+                // 2. wait for completion of all the work
+                // 3. accumulate results and submit batch write of IntentData to store
+                //    (we can also try to update these individually)
 
-                new IntentBatchApplyFirst(ops, processIntentUpdates(updates), endTime, 0, null).run();
+
+                //new IntentBatchApplyFirst(ops, processIntentUpdates(updates), endTime, 0, null).run();
             } catch (Exception e) {
                 log.error("Error submitting batches:", e);
                 // FIXME incomplete Intents should be cleaned up
@@ -488,33 +498,40 @@
             }
         }
 
-        private List<IntentUpdate> createIntentUpdates() {
+        private List<Future<IntentUpdate>> createIntentUpdates() {
             return ops.stream()
-                    .map(IntentManager.this::createIntentUpdate)
-                    .collect(Collectors.toList());
-        }
-
-        private List<CompletedIntentUpdate> processIntentUpdates(List<IntentUpdate> updates) {
-            // start processing each Intents
-            List<CompletedIntentUpdate> completed = new ArrayList<>();
-            for (IntentUpdate update : updates) {
-                Optional<IntentUpdate> phase = Optional.of(update);
-                IntentUpdate previous = update;
-                while (true) {
-                    if (!phase.isPresent()) {
-                        // FIXME: not type safe cast
-                        completed.add((CompletedIntentUpdate) previous);
-                        break;
-                    }
-                    previous = phase.get();
-                    phase = previous.execute();
-                }
-            }
-
-            return completed;
+                      .map(IntentManager.this::submitIntentData)
+                      .collect(Collectors.toList());
         }
     }
 
+    private Future<IntentUpdate> submitIntentData(IntentData data) {
+         return workerExecutor.submit(new IntentWorker(data));
+    }
+
+    private class IntentWorker implements Callable<IntentUpdate> {
+
+        private final IntentData data;
+
+        private IntentWorker(IntentData data) {
+            this.data = data;
+        }
+
+        @Override
+        public IntentUpdate call() throws Exception {
+            IntentUpdate update = createIntentUpdate(data);
+            Optional<IntentUpdate> currentPhase = Optional.of(update);
+            IntentUpdate previousPhase = update;
+
+            while (currentPhase.isPresent()) {
+                previousPhase = currentPhase.get();
+                currentPhase = previousPhase.execute();
+            }
+            return previousPhase;
+        }
+    }
+
+
     // TODO: better naming
     private class IntentBatchApplyFirst extends IntentBatchPreprocess {
 
@@ -596,7 +613,7 @@
                     retry();
                 } else {
                     // we are not done yet, yield the thread by resubmitting ourselves
-                    executor.submit(new IntentBatchProcessFutures(ops, intentUpdates, endTime, installAttempt, future));
+                    batchExecutor.submit(new IntentBatchProcessFutures(ops, intentUpdates, endTime, installAttempt, future));
                 }
             } catch (Exception e) {
                 log.error("Error submitting batches:", e);
@@ -665,7 +682,7 @@
                     return;
                 }
                 Future<CompletedBatchOperation> future = applyNextBatch(intentUpdates);
-                executor.submit(new IntentBatchProcessFutures(ops, intentUpdates, timeLimit, attempts, future));
+                batchExecutor.submit(new IntentBatchProcessFutures(ops, intentUpdates, timeLimit, attempts, future));
             } else {
                 log.error("Cancelling FlowRuleBatch failed.");
                 abandonShip();
@@ -678,7 +695,8 @@
         public void execute(Collection<IntentData> operations) {
             log.info("Execute {} operation(s).", operations.size());
             log.debug("Execute operations: {}", operations);
-            executor.execute(new IntentBatchPreprocess(operations));
+            batchExecutor.execute(new IntentBatchPreprocess(operations));
+            // TODO ensure that only one batch is in flight at a time
         }
     }
 }
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index dd251ec..897bddf 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -38,4 +38,5 @@
      */
     void processEvents(List<T> events);
 
+    //TODO consider a blocking version that required consumer participation
 }