IntentManager: use IntentStore batch APIs

Change-Id: Ie60f3e53f48fa6acbcaf5cf6837bdef12b36a98d
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java
index 81535f5..360775c 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpLeadershipService.java
@@ -159,6 +159,12 @@
     }
 
     @Override
+    public Map<String, Leadership> getLeaderBoard() {
+        throw new UnsupportedOperationException("I don't know what to do." +
+                                                        " I wish you luck.");
+    }
+
+    @Override
     public void addListener(LeadershipEventListener listener) {
         listenerRegistry.addListener(listener);
     }
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
index 0decb08..9326221 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
@@ -15,10 +15,13 @@
  */
 package org.onlab.onos.cli.net;
 
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import org.apache.karaf.shell.commands.Argument;
 import org.apache.karaf.shell.commands.Command;
 import org.onlab.onos.cli.AbstractShellCommand;
+import org.onlab.onos.core.ApplicationId;
+import org.onlab.onos.core.CoreService;
 import org.onlab.onos.net.ConnectPoint;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.PortNumber;
@@ -61,17 +64,27 @@
               required = true, multiValued = false)
     String egressDeviceString = null;
 
-    @Argument(index = 2, name = "count",
-              description = "Number of intents to push",
-              required = true, multiValued = false)
-    String countString = null;
+
+    @Argument(index = 2, name = "Intents per appId",
+            description = "Number of intents per appId",
+            required = true, multiValued = false)
+    String intentsPerAppId = null;
+
+    @Argument(index = 3, name = "apps",
+            description = "Number of appIds",
+            required = false, multiValued = false)
+    String appIds = null;
+
 
     private IntentService service;
     private CountDownLatch latch;
     private long start, end;
+    private int apps;
+    private int intentsPerApp;
     private int count;
     private boolean add;
 
+
     @Override
     protected void execute() {
         service = get(IntentService.class);
@@ -85,13 +98,18 @@
         PortNumber egressPortNumber = portNumber(getPortNumber(egressDeviceString));
         ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber);
 
-        count = Integer.parseInt(countString);
+        apps = appIds != null ? Integer.parseInt(appIds) : 1;
+        intentsPerApp = Integer.parseInt(intentsPerAppId);
+
+        count = intentsPerApp * apps;
+
 
         service.addListener(this);
 
+        ArrayListMultimap<Integer, Intent> operations = generateIntents(ingress, egress);
+
         add = true;
         latch = new CountDownLatch(count);
-        List<Intent> operations = generateIntents(ingress, egress);
         submitIntents(operations);
 
         add = false;
@@ -101,36 +119,41 @@
         service.removeListener(this);
     }
 
-    private List<Intent> generateIntents(ConnectPoint ingress, ConnectPoint egress) {
+    private ArrayListMultimap<Integer, Intent> generateIntents(ConnectPoint ingress, ConnectPoint egress) {
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
                 .matchEthType(Ethernet.TYPE_IPV4);
         TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
 
-        List<Intent> intents = Lists.newArrayList();
-        for (int i = 1; i <= count; i++) {
-            TrafficSelector s = selector
-                    .matchEthSrc(MacAddress.valueOf(i))
-                    .build();
-            intents.add(new PointToPointIntent(appId(), s, treatment,
-                                               ingress, egress));
+        ArrayListMultimap<Integer, Intent> intents = ArrayListMultimap.create();
+        for (int app = 1; app <= apps; app++) {
+            for (int i = 1; i <= intentsPerApp; i++) {
+                TrafficSelector s = selector
+                        .matchEthSrc(MacAddress.valueOf(i))
+                        .build();
+                intents.put(app, new PointToPointIntent(appId(), s, treatment,
+                                                        ingress, egress));
 
+            }
         }
         return intents;
     }
 
-    private void submitIntents(List<Intent> intents) {
-        IntentOperations.Builder builder = IntentOperations.builder(appId());
-        for (Intent intent : intents) {
-            if (add) {
-                builder.addSubmitOperation(intent);
-            } else {
-                builder.addWithdrawOperation(intent.id());
+    private void submitIntents(ArrayListMultimap<Integer, Intent> intents) {
+        List<IntentOperations> opList = Lists.newArrayList();
+        for (Integer app : intents.keySet()) {
+            IntentOperations.Builder builder = IntentOperations.builder(appId(app));
+            for (Intent intent : intents.get(app)) {
+                if (add) {
+                    builder.addSubmitOperation(intent);
+                } else {
+                    builder.addWithdrawOperation(intent.id());
+                }
             }
+            opList.add(builder.build());
         }
-        IntentOperations ops = builder.build();
 
         start = System.currentTimeMillis();
-        service.execute(ops);
+        opList.forEach(ops -> service.execute(ops));
         try {
             if (latch.await(100 + count * 200, TimeUnit.MILLISECONDS)) {
                 printResults(count);
@@ -148,6 +171,16 @@
         print("Time to %s %d intents: %d ms", text, count, delta);
     }
 
+
+    /**
+     * Returns application ID for the CLI.
+     *
+     * @return command-line application identifier
+     */
+    protected ApplicationId appId(Integer id) {
+        return get(CoreService.class).registerApplication("org.onlab.onos.cli-" + id);
+    }
+
     /**
      * Extracts the port number portion of the ConnectPoint.
      *
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/LeaderCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/LeaderCommand.java
new file mode 100644
index 0000000..cd5ee09
--- /dev/null
+++ b/cli/src/main/java/org/onlab/onos/cli/net/LeaderCommand.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.cli.net;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.cli.AbstractShellCommand;
+import org.onlab.onos.cluster.Leadership;
+import org.onlab.onos.cluster.LeadershipService;
+
+import java.util.Map;
+
+/**
+ * Prints the leader for every topic.
+ */
+@Command(scope = "onos", name = "leaders",
+        description = "Finds the leader for particular topic.")
+public class LeaderCommand extends AbstractShellCommand {
+
+    @Override
+    protected void execute() {
+        LeadershipService leaderService = get(LeadershipService.class);
+        Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
+        print("Topic:\t\tLeader");
+        for (String topic : leaderBoard.keySet()) {
+            print("%s:\t%s", topic, leaderBoard.get(topic).leader().id());
+        }
+    }
+
+}
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index b725bcb..9cc21dd 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -258,6 +258,9 @@
         <command>
             <action class="org.onlab.onos.cli.net.AddFlowsCommand"/>
         </command>
+        <command>
+            <action class="org.onlab.onos.cli.net.LeaderCommand"/>
+        </command>
 
         <command>
             <action class="org.onlab.onos.cli.net.WipeOutCommand"/>
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
index e70cc1f..bb89572 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
@@ -15,6 +15,8 @@
  */
 package org.onlab.onos.cluster;
 
+import java.util.Map;
+
 /**
  * Service for leader election.
  * Leadership contests are organized around topics. A instance can join the
@@ -43,6 +45,8 @@
      */
     void withdraw(String path);
 
+    Map<String, Leadership> getLeaderBoard();
+
     /**
      * Registers a event listener to be notified of leadership events.
      * @param listener listener that will asynchronously notified of leadership events.
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index d93432d..2bcb809 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -15,18 +15,10 @@
  */
 package org.onlab.onos.net.intent.impl;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -46,6 +38,7 @@
 import org.onlab.onos.net.intent.IntentBatchService;
 import org.onlab.onos.net.intent.IntentCompiler;
 import org.onlab.onos.net.intent.IntentEvent;
+import org.onlab.onos.net.intent.IntentEvent.Type;
 import org.onlab.onos.net.intent.IntentException;
 import org.onlab.onos.net.intent.IntentExtensionService;
 import org.onlab.onos.net.intent.IntentId;
@@ -56,23 +49,27 @@
 import org.onlab.onos.net.intent.IntentService;
 import org.onlab.onos.net.intent.IntentState;
 import org.onlab.onos.net.intent.IntentStore;
+import org.onlab.onos.net.intent.IntentStore.BatchWrite;
 import org.onlab.onos.net.intent.IntentStoreDelegate;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.onlab.onos.net.intent.IntentState.COMPILING;
-import static org.onlab.onos.net.intent.IntentState.FAILED;
-import static org.onlab.onos.net.intent.IntentState.INSTALLED;
-import static org.onlab.onos.net.intent.IntentState.INSTALLING;
-import static org.onlab.onos.net.intent.IntentState.WITHDRAWING;
-import static org.onlab.onos.net.intent.IntentState.WITHDRAWN;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.onos.net.intent.IntentState.*;
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -88,6 +85,8 @@
     public static final String INTENT_NULL = "Intent cannot be null";
     public static final String INTENT_ID_NULL = "Intent ID cannot be null";
 
+    private static final int NUM_THREADS = 12;
+
     // Collections for compiler, installer, and listener are ONOS instance local
     private final ConcurrentMap<Class<? extends Intent>,
             IntentCompiler<? extends Intent>> compilers = new ConcurrentHashMap<>();
@@ -117,7 +116,6 @@
 
 
     private ExecutorService executor;
-    private ExecutorService monitorExecutor;
 
     private final IntentStoreDelegate delegate = new InternalStoreDelegate();
     private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@@ -130,8 +128,7 @@
         trackerService.setDelegate(topoDelegate);
         batchService.setDelegate(batchDelegate);
         eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
-        executor = newSingleThreadExecutor(namedThreads("onos-intents"));
-        monitorExecutor = newSingleThreadExecutor(namedThreads("onos-intent-monitor"));
+        executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-monitor"));
         idGenerator = coreService.getIdGenerator("intent-ids");
         Intent.bindIdGenerator(idGenerator);
         log.info("Started");
@@ -144,7 +141,6 @@
         batchService.unsetDelegate(batchDelegate);
         eventDispatcher.removeSink(IntentEvent.class);
         executor.shutdown();
-        monitorExecutor.shutdown();
         Intent.unbindIdGenerator(idGenerator);
         log.info("Stopped");
     }
@@ -288,7 +284,7 @@
     private void executeCompilingPhase(IntentUpdate update) {
         Intent intent = update.newIntent();
         // Indicate that the intent is entering the compiling phase.
-        update.setState(intent, COMPILING);
+        update.setInflightState(intent, COMPILING);
 
         try {
             // Compile the intent into installable derivatives.
@@ -301,7 +297,7 @@
             log.warn("Unable to compile intent {} due to:", intent.id(), e);
 
             // If compilation failed, mark the intent as failed.
-            update.setState(intent, FAILED);
+            update.setInflightState(intent, FAILED);
         }
     }
 
@@ -338,7 +334,7 @@
             return;
         }
         // Indicate that the intent is entering the installing phase.
-        update.setState(update.newIntent(), INSTALLING);
+        update.setInflightState(update.newIntent(), INSTALLING);
 
         List<FlowRuleBatchOperation> batches = Lists.newArrayList();
         for (Intent installable : update.newInstallables()) {
@@ -365,7 +361,7 @@
      */
     private void executeWithdrawingPhase(IntentUpdate update) {
         if (!update.oldIntent().equals(update.newIntent())) {
-            update.setState(update.oldIntent(), WITHDRAWING);
+            update.setInflightState(update.oldIntent(), WITHDRAWING);
         } // else newIntent is FAILED
         update.addBatches(uninstallIntent(update.oldIntent(), update.oldInstallables()));
     }
@@ -405,9 +401,9 @@
                       "Old and New Intent must have equivalent installable intents.");
         if (!update.oldIntent().equals(update.newIntent())) {
             // only set the old intent's state if it is different
-            update.setState(update.oldIntent(), WITHDRAWING);
+            update.setInflightState(update.oldIntent(), WITHDRAWING);
         }
-        update.setState(update.newIntent(), INSTALLING);
+        update.setInflightState(update.newIntent(), INSTALLING);
 
         List<FlowRuleBatchOperation> batches = Lists.newArrayList();
         for (int i = 0; i < update.oldInstallables().size(); i++) {
@@ -427,7 +423,7 @@
                 log.warn("Unable to update intent {} due to:", update.oldIntent().id(), e);
                 //FIXME... we failed. need to uninstall (if same) or revert (if different)
                 trackerService.removeTrackedResources(update.newIntent().id(), newInstallable.resources());
-                update.setState(update.newIntent(), FAILED);
+                update.setInflightState(update.newIntent(), FAILED);
                 batches = uninstallIntent(update.oldIntent(), update.oldInstallables());
             }
         }
@@ -539,6 +535,7 @@
         }
     }
 
+    // TODO move this inside IntentUpdate?
     /**
      * TODO. rename this...
      * @param update intent update
@@ -558,9 +555,9 @@
             executeWithdrawingPhase(update);
         } else {
             if (update.oldIntent() != null &&
-                    !update.oldIntent().equals(update.newIntent())) {
+                !update.oldIntent().equals(update.newIntent())) {
                 // removing failed intent
-                update.setState(update.oldIntent(), WITHDRAWING);
+                update.setInflightState(update.oldIntent(), WITHDRAWING);
             }
 //            if (update.newIntent() != null) {
 //                // TODO assert that next state is failed
@@ -602,13 +599,6 @@
                     newIntent = null;
                     break;
             }
-            // add new intent to store (if required)
-            if (newIntent != null) {
-                IntentEvent event = store.createIntent(newIntent);
-                if (event != null) {
-                    eventDispatcher.post(event);
-                }
-            }
             // fetch the old intent's installables from the store
             if (oldIntent != null) {
                 oldInstallables = store.getInstallableIntents(oldIntent.id());
@@ -617,6 +607,13 @@
             }
         }
 
+        void init(BatchWrite batchWrite) {
+            // add new intent to store (if required)
+            if (newIntent != null) {
+                batchWrite.createIntent(newIntent);
+            }
+        }
+
         Intent oldIntent() {
             return oldIntent;
         }
@@ -635,7 +632,10 @@
 
         void setInstallables(List<Intent> installables) {
             newInstallables = installables;
-            store.setInstallableIntents(newIntent.id(), installables);
+            //FIXME batch this
+
+            //store.setInstallableIntents(newIntent.id(), installables);
+
         }
 
         boolean isComplete() {
@@ -646,42 +646,51 @@
             return !isComplete() ? batches.get(currentBatch) : null;
         }
 
-        void incrementBatch(boolean success) {
-            if (success) { // actually increment
-                if (++currentBatch == batches.size()) {
-                    finalizeStates();
-                }
-            } else { // the current batch has failed, so recompile
-                // remove the current batch and all remaining
-                for (int i = currentBatch; i < batches.size(); i++) {
-                    batches.remove(i);
-                }
-                if (oldIntent != null) {
-                    executeWithdrawingPhase(this); // remove the old intent
-                }
-                if (newIntent != null) {
-                    setState(newIntent, FAILED);
-                    batches.addAll(uninstallIntent(newIntent, newInstallables()));
-                }
-
-                // FIXME: should we try to recompile?
+        List<IntentEvent> batchSuccess(BatchWrite batchWrite) {
+            // move on to next Batch
+            if (++currentBatch == batches.size()) {
+                return finalizeStates(batchWrite);
             }
+            return Collections.emptyList();
+        }
+
+        void batchFailed() {
+
+            // the current batch has failed, so recompile
+            // remove the current batch and all remaining
+            for (int i = currentBatch; i < batches.size(); i++) {
+                batches.remove(i);
+            }
+            if (oldIntent != null) {
+                executeWithdrawingPhase(this); // remove the old intent
+            }
+            if (newIntent != null) {
+                setInflightState(newIntent, FAILED);
+                batches.addAll(uninstallIntent(newIntent, newInstallables()));
+            }
+
+            // FIXME: should we try to recompile?
         }
 
         // FIXME make sure this is called!!!
-        private void finalizeStates() {
+        private List<IntentEvent> finalizeStates(BatchWrite batchWrite) {
+            // events to be triggered on successful write
+            List<IntentEvent> events = new ArrayList<>();
             for (Intent intent : stateMap.keySet()) {
-                switch (getState(intent)) {
+                switch (getInflightState(intent)) {
                     case INSTALLING:
-                        setState(intent, INSTALLED);
+                        batchWrite.setState(intent, INSTALLED);
+                        batchWrite.setInstallableIntents(newIntent.id(), newInstallables);
+                        events.add(new IntentEvent(Type.INSTALLED, intent));
                         break;
                     case WITHDRAWING:
-                        setState(intent, WITHDRAWN);
-                        store.removeInstalledIntents(intent.id());
-                        //store.removeIntent(intent.id()); // FIXME we die a horrible death here
+                        batchWrite.setState(intent, WITHDRAWN);
+                        events.add(new IntentEvent(Type.WITHDRAWN, intent));
+                        batchWrite.removeInstalledIntents(intent.id());
+                        batchWrite.removeIntent(intent.id());
                         break;
                     case FAILED:
-                        store.removeInstalledIntents(intent.id());
+                        batchWrite.removeInstalledIntents(intent.id());
                         break;
 
                     // FALLTHROUGH to default from here
@@ -692,10 +701,11 @@
                     case INSTALLED:
                     default:
                         //FIXME clean this up (we shouldn't ever get here)
-                        log.warn("Bad state: {} for {}", getState(intent), intent);
+                        log.warn("Bad state: {} for {}", getInflightState(intent), intent);
                         break;
                 }
             }
+            return events;
         }
 
         List<FlowRuleBatchOperation> batches() {
@@ -706,11 +716,21 @@
             this.batches.addAll(batches);
         }
 
-        IntentState getState(Intent intent) {
+        IntentState getInflightState(Intent intent) {
             return stateMap.get(intent);
         }
 
-        void setState(Intent intent, IntentState newState) {
+
+        // set transient state during intent update process
+        void setInflightState(Intent intent, IntentState newState) {
+            // This method should be called for
+            // transition to non-parking or Failed only
+            EnumSet<IntentState> nonParkingOrFailed
+                = EnumSet.complementOf(EnumSet.of(SUBMITTED, INSTALLED, WITHDRAWN));
+            if (!nonParkingOrFailed.contains(newState)) {
+                log.error("Unexpected transition to {}", newState);
+            }
+
             // TODO: clean this up, or set to debug
             IntentState oldState = stateMap.get(intent);
             log.debug("intent id: {}, old state: {}, new state: {}",
@@ -721,10 +741,6 @@
             if (event != null) {
                 eventDispatcher.post(event);
             }
-
-            if (newState == WITHDRAWN) {
-                store.removeIntent(intent.id());
-            }
         }
 
         Map<Intent, IntentState> stateMap() {
@@ -741,6 +757,7 @@
         private final IntentOperations ops;
         private final List<IntentUpdate> intentUpdates = Lists.newArrayList();
 
+        // future holding current FlowRuleBatch installation result
         private Future<CompletedBatchOperation> future;
         private long startTime = System.currentTimeMillis();
         private long endTime;
@@ -758,9 +775,27 @@
         }
 
         private void buildIntentUpdates() {
+            BatchWrite batchWrite = store.newBatchWrite();
+
+            // create context and record new request to store
             for (IntentOperation op : ops.operations()) {
                 IntentUpdate update = new IntentUpdate(op);
+                update.init(batchWrite);
                 intentUpdates.add(update);
+            }
+
+            if (!batchWrite.isEmpty()) {
+                store.batchWrite(batchWrite);
+            }
+
+            // start processing each Intents
+            for (IntentUpdate update : intentUpdates) {
+                if (update.newIntent() != null) {
+                    IntentState state = store.getIntentState(update.newIntent().id());
+                    if (state == SUBMITTED) {
+                        eventDispatcher.post(new IntentEvent(Type.SUBMITTED, update.newIntent));
+                    }
+                }
                 processIntentUpdate(update);
             }
             future = applyNextBatch();
@@ -784,8 +819,16 @@
 
         private void updateBatches(CompletedBatchOperation completed) {
             if (completed.isSuccess()) {
+                BatchWrite batchWrite = store.newBatchWrite();
+                List<IntentEvent> events = new ArrayList<>();
                 for (IntentUpdate update : intentUpdates) {
-                    update.incrementBatch(true);
+                    events.addAll(update.batchSuccess(batchWrite));
+                }
+                if (!batchWrite.isEmpty()) {
+                    store.batchWrite(batchWrite);
+                    for (IntentEvent event : events) {
+                        eventDispatcher.post(event);
+                    }
                 }
             } else {
                 // entire batch has been reverted...
@@ -798,7 +841,7 @@
                         installables.addAll(update.oldInstallables());
                         for (Intent intent : installables) {
                             if (intent.id().equals(targetId)) {
-                                update.incrementBatch(false);
+                                update.batchFailed();
                                 break;
                             }
                         }
@@ -834,12 +877,13 @@
                 if (installAttempt++ >= MAX_ATTEMPTS) {
                     log.warn("Install request timed out: {}", ops);
                     for (IntentUpdate update : intentUpdates) {
-                        update.incrementBatch(false);
+                        update.batchFailed();
                     }
                 } // else just resubmit the work
                 future = applyNextBatch();
-                monitorExecutor.submit(this);
+                executor.submit(this);
             } else {
+                log.error("Cancelling FlowRuleBatch failed.");
                 // FIXME
                 // cancel failed... batch is broken; shouldn't happen!
                 // we could manually reverse everything
@@ -859,20 +903,37 @@
                 if (intentUpdates.isEmpty()) {
                     // this should only be called on the first iteration
                     // note: this a "expensive", so it is not done in the constructor
+
+                    // - creates per Intent installation context (IntentUpdate)
+                    // - write Intents to store
+                    // - process (compile, install, etc.) each Intents
+                    // - generate FlowRuleBatch for this phase
                     buildIntentUpdates();
                 }
+
+                // - peek if current FlowRuleBatch is complete
+                // -- If complete OK:
+                //       step each IntentUpdate forward
+                //           If phase left: generate next FlowRuleBatch
+                //           If no more phase: write parking states
+                // -- If complete FAIL:
+                //       Intent which failed: transition Intent to FAILED
+                //       Other Intents: resubmit same FlowRuleBatch for this phase
                 processFutures();
                 if (isComplete()) {
                     // there are no outstanding batches; we are done
                     batchService.removeIntentOperations(ops);
                 } else if (endTime < System.currentTimeMillis()) {
+                    // - cancel current FlowRuleBatch and resubmit again
                     retry();
                 } else {
                     // we are not done yet, yield the thread by resubmitting ourselves
-                    monitorExecutor.submit(this);
+                    executor.submit(this);
                 }
             } catch (Exception e) {
                 log.error("Error submitting batches:", e);
+                // FIXME incomplete Intents should be cleaned up
+                //       (transition to FAILED, etc.)
             }
         }
     }
@@ -883,7 +944,7 @@
             log.info("Execute {} operation(s).", operations.operations().size());
             log.debug("Execute operations: {}", operations.operations());
             //FIXME: perhaps we want to track this task so that we can cancel it.
-            monitorExecutor.execute(new IntentInstallMonitor(operations));
+            executor.execute(new IntentInstallMonitor(operations));
         }
 
         @Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
index 0e55184..f18907c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -11,6 +11,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -159,6 +160,11 @@
     }
 
     @Override
+    public Map<String, Leadership> getLeaderBoard() {
+        return ImmutableMap.copyOf(leaderBoard);
+    }
+
+    @Override
     public void addListener(LeadershipEventListener listener) {
         checkArgument(listener != null);
         listeners.add(listener);