started work to parrallize compliation and "installation"
Change-Id: I2e7f03b9f8074ef6f9e1c186009ed3cad6980b49
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
index 87e786c..55892c3 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
@@ -82,6 +82,7 @@
*/
@Deprecated
List<Operation> batchWrite(BatchWrite batch);
+
default void write(IntentData newData) {}
default void batchWrite(Iterable<IntentData> updates) {}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 05efb22..6551c85 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -66,6 +67,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.net.intent.IntentState.FAILED;
import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
@@ -115,7 +117,8 @@
protected FlowRuleService flowRuleService;
- private ExecutorService executor;
+ private ExecutorService batchExecutor;
+ private ExecutorService workerExecutor;
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@@ -129,7 +132,8 @@
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
- executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-%d"));
+ batchExecutor = newSingleThreadExecutor(namedThreads("onos-intent-batch"));
+ workerExecutor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-worker-%d"));
idGenerator = coreService.getIdGenerator("intent-ids");
Intent.bindIdGenerator(idGenerator);
log.info("Started");
@@ -140,7 +144,7 @@
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
- executor.shutdown();
+ batchExecutor.shutdown();
Intent.unbindIdGenerator(idGenerator);
log.info("Stopped");
}
@@ -472,9 +476,15 @@
@Override
public void run() {
try {
- List<IntentUpdate> updates = createIntentUpdates();
+ // 1. wrap each intentdata in a runnable and submit
+ List<Future<IntentUpdate>> updates = createIntentUpdates();
+ // TODO
+ // 2. wait for completion of all the work
+ // 3. accumulate results and submit batch write of IntentData to store
+ // (we can also try to update these individually)
- new IntentBatchApplyFirst(ops, processIntentUpdates(updates), endTime, 0, null).run();
+
+ //new IntentBatchApplyFirst(ops, processIntentUpdates(updates), endTime, 0, null).run();
} catch (Exception e) {
log.error("Error submitting batches:", e);
// FIXME incomplete Intents should be cleaned up
@@ -488,33 +498,40 @@
}
}
- private List<IntentUpdate> createIntentUpdates() {
+ private List<Future<IntentUpdate>> createIntentUpdates() {
return ops.stream()
- .map(IntentManager.this::createIntentUpdate)
- .collect(Collectors.toList());
- }
-
- private List<CompletedIntentUpdate> processIntentUpdates(List<IntentUpdate> updates) {
- // start processing each Intents
- List<CompletedIntentUpdate> completed = new ArrayList<>();
- for (IntentUpdate update : updates) {
- Optional<IntentUpdate> phase = Optional.of(update);
- IntentUpdate previous = update;
- while (true) {
- if (!phase.isPresent()) {
- // FIXME: not type safe cast
- completed.add((CompletedIntentUpdate) previous);
- break;
- }
- previous = phase.get();
- phase = previous.execute();
- }
- }
-
- return completed;
+ .map(IntentManager.this::submitIntentData)
+ .collect(Collectors.toList());
}
}
+ private Future<IntentUpdate> submitIntentData(IntentData data) {
+ return workerExecutor.submit(new IntentWorker(data));
+ }
+
+ private class IntentWorker implements Callable<IntentUpdate> {
+
+ private final IntentData data;
+
+ private IntentWorker(IntentData data) {
+ this.data = data;
+ }
+
+ @Override
+ public IntentUpdate call() throws Exception {
+ IntentUpdate update = createIntentUpdate(data);
+ Optional<IntentUpdate> currentPhase = Optional.of(update);
+ IntentUpdate previousPhase = update;
+
+ while (currentPhase.isPresent()) {
+ previousPhase = currentPhase.get();
+ currentPhase = previousPhase.execute();
+ }
+ return previousPhase;
+ }
+ }
+
+
// TODO: better naming
private class IntentBatchApplyFirst extends IntentBatchPreprocess {
@@ -596,7 +613,7 @@
retry();
} else {
// we are not done yet, yield the thread by resubmitting ourselves
- executor.submit(new IntentBatchProcessFutures(ops, intentUpdates, endTime, installAttempt, future));
+ batchExecutor.submit(new IntentBatchProcessFutures(ops, intentUpdates, endTime, installAttempt, future));
}
} catch (Exception e) {
log.error("Error submitting batches:", e);
@@ -665,7 +682,7 @@
return;
}
Future<CompletedBatchOperation> future = applyNextBatch(intentUpdates);
- executor.submit(new IntentBatchProcessFutures(ops, intentUpdates, timeLimit, attempts, future));
+ batchExecutor.submit(new IntentBatchProcessFutures(ops, intentUpdates, timeLimit, attempts, future));
} else {
log.error("Cancelling FlowRuleBatch failed.");
abandonShip();
@@ -678,7 +695,8 @@
public void execute(Collection<IntentData> operations) {
log.info("Execute {} operation(s).", operations.size());
log.debug("Execute operations: {}", operations);
- executor.execute(new IntentBatchPreprocess(operations));
+ batchExecutor.execute(new IntentBatchPreprocess(operations));
+ // TODO ensure that only one batch is in flight at a time
}
}
}
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index dd251ec..897bddf 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -38,4 +38,5 @@
*/
void processEvents(List<T> events);
+ //TODO consider a blocking version that required consumer participation
}