Initial implementation of distributed intent batch queue

Change-Id: I7ffed03651569ade1be1e8dca905bfaf369b7e03
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index da2fe70..1cb3210 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -33,6 +33,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.core.CoreService;
 import org.onlab.onos.core.IdGenerator;
 import org.onlab.onos.event.AbstractListenerRegistry;
@@ -151,20 +152,22 @@
     @Override
     public void submit(Intent intent) {
         checkNotNull(intent, INTENT_NULL);
-        execute(IntentOperations.builder().addSubmitOperation(intent).build());
+        execute(IntentOperations.builder(intent.appId())
+                        .addSubmitOperation(intent).build());
     }
 
     @Override
     public void withdraw(Intent intent) {
         checkNotNull(intent, INTENT_NULL);
-        execute(IntentOperations.builder().addWithdrawOperation(intent.id()).build());
+        execute(IntentOperations.builder(intent.appId())
+                        .addWithdrawOperation(intent.id()).build());
     }
 
     @Override
     public void replace(IntentId oldIntentId, Intent newIntent) {
         checkNotNull(oldIntentId, INTENT_ID_NULL);
         checkNotNull(newIntent, INTENT_NULL);
-        execute(IntentOperations.builder()
+        execute(IntentOperations.builder(newIntent.appId())
                 .addReplaceOperation(oldIntentId, newIntent)
                 .build());
     }
@@ -489,26 +492,50 @@
         }
     }
 
+    private void buildAndSubmitBatches(Iterable<IntentId> intentIds,
+                                       boolean compileAllFailed) {
+        Map<ApplicationId, IntentOperations.Builder> batches = Maps.newHashMap();
+        // Attempt recompilation of the specified intents first.
+        for (IntentId id : intentIds) {
+            Intent intent = store.getIntent(id);
+            if (intent == null) {
+                continue;
+            }
+            IntentOperations.Builder builder = batches.get(intent.appId());
+            if (builder == null) {
+                builder = IntentOperations.builder(intent.appId());
+                batches.put(intent.appId(), builder);
+            }
+            builder.addUpdateOperation(id);
+        }
+
+        if (compileAllFailed) {
+            // If required, compile all currently failed intents.
+            for (Intent intent : getIntents()) {
+                if (getIntentState(intent.id()) == FAILED) {
+                    IntentOperations.Builder builder = batches.get(intent.appId());
+                    if (builder == null) {
+                        builder = IntentOperations.builder(intent.appId());
+                        batches.put(intent.appId(), builder);
+                    }
+                    builder.addUpdateOperation(intent.id());
+                }
+            }
+        }
+
+        for (ApplicationId appId : batches.keySet()) {
+            if (batchService.isLocalLeader(appId)) {
+                execute(batches.get(appId).build());
+            }
+        }
+    }
+
     // Topology change delegate
     private class InternalTopoChangeDelegate implements TopologyChangeDelegate {
         @Override
         public void triggerCompile(Iterable<IntentId> intentIds,
                                    boolean compileAllFailed) {
-            // Attempt recompilation of the specified intents first.
-            IntentOperations.Builder builder = IntentOperations.builder();
-            for (IntentId id : intentIds) {
-                builder.addUpdateOperation(id);
-            }
-
-            if (compileAllFailed) {
-                // If required, compile all currently failed intents.
-                for (Intent intent : getIntents()) {
-                    if (getIntentState(intent.id()) == FAILED) {
-                        builder.addUpdateOperation(intent.id());
-                    }
-                }
-            }
-            execute(builder.build());
+            buildAndSubmitBatches(intentIds, compileAllFailed);
         }
     }