Initial implementation of distributed intent batch queue
Change-Id: I7ffed03651569ade1be1e8dca905bfaf369b7e03
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index da2fe70..1cb3210 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -33,6 +33,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.core.IdGenerator;
import org.onlab.onos.event.AbstractListenerRegistry;
@@ -151,20 +152,22 @@
@Override
public void submit(Intent intent) {
checkNotNull(intent, INTENT_NULL);
- execute(IntentOperations.builder().addSubmitOperation(intent).build());
+ execute(IntentOperations.builder(intent.appId())
+ .addSubmitOperation(intent).build());
}
@Override
public void withdraw(Intent intent) {
checkNotNull(intent, INTENT_NULL);
- execute(IntentOperations.builder().addWithdrawOperation(intent.id()).build());
+ execute(IntentOperations.builder(intent.appId())
+ .addWithdrawOperation(intent.id()).build());
}
@Override
public void replace(IntentId oldIntentId, Intent newIntent) {
checkNotNull(oldIntentId, INTENT_ID_NULL);
checkNotNull(newIntent, INTENT_NULL);
- execute(IntentOperations.builder()
+ execute(IntentOperations.builder(newIntent.appId())
.addReplaceOperation(oldIntentId, newIntent)
.build());
}
@@ -489,26 +492,50 @@
}
}
+ private void buildAndSubmitBatches(Iterable<IntentId> intentIds,
+ boolean compileAllFailed) {
+ Map<ApplicationId, IntentOperations.Builder> batches = Maps.newHashMap();
+ // Attempt recompilation of the specified intents first.
+ for (IntentId id : intentIds) {
+ Intent intent = store.getIntent(id);
+ if (intent == null) {
+ continue;
+ }
+ IntentOperations.Builder builder = batches.get(intent.appId());
+ if (builder == null) {
+ builder = IntentOperations.builder(intent.appId());
+ batches.put(intent.appId(), builder);
+ }
+ builder.addUpdateOperation(id);
+ }
+
+ if (compileAllFailed) {
+ // If required, compile all currently failed intents.
+ for (Intent intent : getIntents()) {
+ if (getIntentState(intent.id()) == FAILED) {
+ IntentOperations.Builder builder = batches.get(intent.appId());
+ if (builder == null) {
+ builder = IntentOperations.builder(intent.appId());
+ batches.put(intent.appId(), builder);
+ }
+ builder.addUpdateOperation(intent.id());
+ }
+ }
+ }
+
+ for (ApplicationId appId : batches.keySet()) {
+ if (batchService.isLocalLeader(appId)) {
+ execute(batches.get(appId).build());
+ }
+ }
+ }
+
// Topology change delegate
private class InternalTopoChangeDelegate implements TopologyChangeDelegate {
@Override
public void triggerCompile(Iterable<IntentId> intentIds,
boolean compileAllFailed) {
- // Attempt recompilation of the specified intents first.
- IntentOperations.Builder builder = IntentOperations.builder();
- for (IntentId id : intentIds) {
- builder.addUpdateOperation(id);
- }
-
- if (compileAllFailed) {
- // If required, compile all currently failed intents.
- for (Intent intent : getIntents()) {
- if (getIntentState(intent.id()) == FAILED) {
- builder.addUpdateOperation(intent.id());
- }
- }
- }
- execute(builder.build());
+ buildAndSubmitBatches(intentIds, compileAllFailed);
}
}