fix intent issues yuta observed

Change-Id: I7dc4a19d49a1b3fc18ecce02a4018cbc9a3043fc
diff --git a/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java b/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java
index ff877da..34ab546 100644
--- a/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java
+++ b/apps/demo/src/main/java/org/onlab/onos/demo/DemoAPI.java
@@ -14,6 +14,7 @@
     /**
      * Installs intents based on the installation type.
      * @param type the installation type.
+     * @param runParams run params
      */
     void setup(InstallType type, Optional<JsonNode> runParams);
 
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
index 32ea13f..5dd7736 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/IntentPushTestCommand.java
@@ -199,6 +199,7 @@
     /**
      * Returns application ID for the CLI.
      *
+     * @param id application id
      * @return command-line application identifier
      */
     protected ApplicationId appId(Integer id) {
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
index 762bbb8..8a58aa2 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
@@ -56,8 +56,8 @@
      * Return true if this instance is the local leader for batch
      * processing a given application id.
      *
-     * @param applicationId
-     * @return
+     * @param applicationId an application id
+     * @return true if this instance is the local leader for batch
      */
     boolean isLocalLeader(ApplicationId applicationId);
 
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentEvent.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentEvent.java
index 6d7a3b0..495f3ea 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentEvent.java
@@ -67,4 +67,32 @@
         super(type, intent);
     }
 
+    public static IntentEvent getEvent(IntentState state, Intent intent) {
+        Type type;
+        switch (state) {
+            case SUBMITTED:
+                type = Type.SUBMITTED;
+                break;
+            case INSTALLED:
+                type = Type.INSTALLED;
+                break;
+            case WITHDRAWN:
+                type = Type.WITHDRAWN;
+                break;
+            case FAILED:
+                type = Type.FAILED;
+                break;
+
+            //fallthrough to default from here
+            case COMPILING:
+            case INSTALLING:
+            case RECOMPILING:
+            case WITHDRAWING:
+            default:
+                throw new IllegalArgumentException(
+                        "Intent event cannot have transient state: " + state);
+        }
+        return new IntentEvent(type, intent);
+    }
+
 }
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
index 8dc70d8..442a247 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
@@ -15,7 +15,6 @@
  */
 package org.onlab.onos.net.intent;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
@@ -42,16 +41,16 @@
      * mechanism.
      *
      * @param intent intent to be submitted
-     * @return event indicating the intent was submitted or null if no
-     * change resulted, e.g. duplicate intent
      */
-    IntentEvent createIntent(Intent intent);
+    @Deprecated
+    void createIntent(Intent intent);
 
     /**
      * Removes the specified intent from the inventory.
      *
      * @param intentId intent identification
      */
+    @Deprecated
     void removeIntent(IntentId intentId);
 
     /**
@@ -89,9 +88,8 @@
      *
      * @param intent   intent whose state is to be changed
      * @param newState new state
-     * @return state transition event
      */
-    IntentEvent setState(Intent intent, IntentState newState);
+    void setState(Intent intent, IntentState newState);
 
     /**
      * Sets the installable intents which resulted from compilation of the
@@ -129,64 +127,13 @@
         return new BatchWrite();
     }
 
-    // default implementation simply executes them sequentially.
-    // Store implementation should override and implement actual batch write.
     /**
      * Execute writes in a batch.
      *
      * @param batch BatchWrite to execute
      * @return failed operations
      */
-    default List<Operation> batchWrite(BatchWrite batch) {
-        List<Operation> failed = new ArrayList<>();
-        for (Operation op : batch.operations) {
-            switch (op.type) {
-            case CREATE_INTENT:
-                checkArgument(op.args.size() == 1,
-                              "CREATE_INTENT takes 1 argument. %s", op);
-                Intent intent = (Intent) op.args.get(0);
-                if (createIntent(intent) == null) {
-                    failed.add(op);
-                }
-                break;
-
-            case REMOVE_INTENT:
-                checkArgument(op.args.size() == 1,
-                              "REMOVE_INTENT takes 1 argument. %s", op);
-                IntentId intentId = (IntentId) op.args.get(0);
-                removeIntent(intentId);
-                break;
-
-            case REMOVE_INSTALLED:
-                checkArgument(op.args.size() == 1,
-                              "REMOVE_INSTALLED takes 1 argument. %s", op);
-                intentId = (IntentId) op.args.get(0);
-                removeInstalledIntents(intentId);
-                break;
-
-            case SET_INSTALLABLE:
-                checkArgument(op.args.size() == 2,
-                              "SET_INSTALLABLE takes 2 arguments. %s", op);
-                intentId = (IntentId) op.args.get(0);
-                @SuppressWarnings("unchecked")
-                List<Intent> installableIntents = (List<Intent>) op.args.get(1);
-                setInstallableIntents(intentId, installableIntents);
-                break;
-
-            case SET_STATE:
-                checkArgument(op.args.size() == 2,
-                              "SET_STATE takes 2 arguments. %s", op);
-                intent = (Intent) op.args.get(0);
-                IntentState newState = (IntentState) op.args.get(1);
-                setState(intent, newState);
-                break;
-
-            default:
-                break;
-            }
-        }
-        return failed;
-    }
+     List<Operation> batchWrite(BatchWrite batch);
 
     public static class BatchWrite {
 
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index 2bcb809..d8c2dcd 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -128,7 +128,7 @@
         trackerService.setDelegate(topoDelegate);
         batchService.setDelegate(batchDelegate);
         eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
-        executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-monitor"));
+        executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent"));
         idGenerator = coreService.getIdGenerator("intent-ids");
         Intent.bindIdGenerator(idGenerator);
         log.info("Started");
@@ -646,12 +646,11 @@
             return !isComplete() ? batches.get(currentBatch) : null;
         }
 
-        List<IntentEvent> batchSuccess(BatchWrite batchWrite) {
+        void batchSuccess(BatchWrite batchWrite) {
             // move on to next Batch
             if (++currentBatch == batches.size()) {
-                return finalizeStates(batchWrite);
+                finalizeStates(batchWrite);
             }
-            return Collections.emptyList();
         }
 
         void batchFailed() {
@@ -673,19 +672,16 @@
         }
 
         // FIXME make sure this is called!!!
-        private List<IntentEvent> finalizeStates(BatchWrite batchWrite) {
+        private void finalizeStates(BatchWrite batchWrite) {
             // events to be triggered on successful write
-            List<IntentEvent> events = new ArrayList<>();
             for (Intent intent : stateMap.keySet()) {
                 switch (getInflightState(intent)) {
                     case INSTALLING:
                         batchWrite.setState(intent, INSTALLED);
                         batchWrite.setInstallableIntents(newIntent.id(), newInstallables);
-                        events.add(new IntentEvent(Type.INSTALLED, intent));
                         break;
                     case WITHDRAWING:
                         batchWrite.setState(intent, WITHDRAWN);
-                        events.add(new IntentEvent(Type.WITHDRAWN, intent));
                         batchWrite.removeInstalledIntents(intent.id());
                         batchWrite.removeIntent(intent.id());
                         break;
@@ -705,7 +701,6 @@
                         break;
                 }
             }
-            return events;
         }
 
         List<FlowRuleBatchOperation> batches() {
@@ -737,10 +732,10 @@
                     intent.id(), oldState, newState);
 
             stateMap.put(intent, newState);
-            IntentEvent event = store.setState(intent, newState);
-            if (event != null) {
-                eventDispatcher.post(event);
-            }
+//            IntentEvent event = store.setState(intent, newState);
+//            if (event != null) {
+//                eventDispatcher.post(event);
+//            }
         }
 
         Map<Intent, IntentState> stateMap() {
@@ -822,7 +817,7 @@
                 BatchWrite batchWrite = store.newBatchWrite();
                 List<IntentEvent> events = new ArrayList<>();
                 for (IntentUpdate update : intentUpdates) {
-                    events.addAll(update.batchSuccess(batchWrite));
+                    update.batchSuccess(batchWrite);
                 }
                 if (!batchWrite.isEmpty()) {
                     store.batchWrite(batchWrite);
diff --git a/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
index 20240d4..f27ddc1 100644
--- a/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
@@ -5,6 +5,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+
 import org.hamcrest.Description;
 import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeMatcher;
@@ -40,6 +41,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -229,7 +231,8 @@
 
         public void await(IntentEvent.Type type) {
             try {
-                latchMap.get(type).await();
+                assertTrue("Timed out waiting for: " + type,
+                           latchMap.get(type).await(5, TimeUnit.SECONDS));
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/hz/SQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SQueue.java
index bce44e5..c58e411 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/hz/SQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/hz/SQueue.java
@@ -18,6 +18,7 @@
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ItemEvent;
 import com.hazelcast.core.ItemListener;
 import com.hazelcast.monitor.LocalQueueStats;
 
@@ -201,16 +202,39 @@
         return q.getLocalQueueStats();
     }
 
-    @Deprecated // not implemented yet
+
     @Override
-    public String addItemListener(ItemListener<T> itemListener, boolean b) {
-        throw new UnsupportedOperationException();
+    public String addItemListener(ItemListener<T> itemListener, boolean withValue) {
+        ItemListener<byte[]> il = new ItemListener<byte[]>() {
+            @Override
+            public void itemAdded(ItemEvent<byte[]> item) {
+                itemListener.itemAdded(new ItemEvent<T>(getName(item),
+                                                        item.getEventType(),
+                                                        deserialize(item.getItem()),
+                                                        item.getMember()));
+            }
+
+            @Override
+            public void itemRemoved(ItemEvent<byte[]> item) {
+                itemListener.itemRemoved(new ItemEvent<T>(getName(item),
+                                                          item.getEventType(),
+                                                          deserialize(item.getItem()),
+                                                          item.getMember()));
+            }
+
+            private String getName(ItemEvent<byte[]> item) {
+                return (item.getSource() instanceof String) ?
+                        (String) item.getSource() : item.getSource().toString();
+
+            }
+        };
+        return q.addItemListener(il, withValue);
     }
 
-    @Deprecated // not implemented yet
+
     @Override
-    public boolean removeItemListener(String s) {
-        throw new UnsupportedOperationException();
+    public boolean removeItemListener(String registrationId) {
+        return q.removeItemListener(registrationId);
     }
 
     @Deprecated
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
index 8888001..cbb385b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/DistributedIntentStore.java
@@ -23,6 +23,7 @@
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableSet;
 
+import com.google.common.collect.Lists;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -187,15 +188,16 @@
     }
 
     @Override
-    public IntentEvent createIntent(Intent intent) {
+    public void createIntent(Intent intent) {
         Context timer = startTimer(createIntentTimer);
         try {
             boolean absent = intents.putIfAbsent(intent.id(), intent);
             if (!absent) {
                 // duplicate, ignore
-                return null;
+                return;
             } else {
-                return this.setState(intent, IntentState.SUBMITTED);
+                this.setState(intent, IntentState.SUBMITTED);
+                return;
             }
         } finally {
             stopTimer(timer);
@@ -273,7 +275,7 @@
     }
 
     @Override
-    public IntentEvent setState(Intent intent, IntentState state) {
+    public void setState(Intent intent, IntentState state) {
         Context timer = startTimer(setStateTimer);
         try {
             final IntentId id = intent.id();
@@ -341,10 +343,10 @@
                 log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
             }
 
-            if (evtType == null) {
-                return null;
+            if (evtType != null) {
+                notifyDelegate(new IntentEvent(evtType, intent));
             }
-            return new IntentEvent(evtType, intent);
+            return;
         } finally {
             stopTimer(timer);
         }
@@ -417,6 +419,7 @@
 
         List<Operation> failed = new ArrayList<>();
         final Builder builder = BatchWriteRequest.newBuilder();
+        List<IntentEvent> events = Lists.newArrayList();
 
         final Set<IntentId> transitionedToParking = new HashSet<>();
 
@@ -428,6 +431,7 @@
                 Intent intent = op.arg(0);
                 builder.putIfAbsent(INTENTS_TABLE, strIntentId(intent.id()), serializer.encode(intent));
                 builder.putIfAbsent(STATES_TABLE, strIntentId(intent.id()), serializer.encode(SUBMITTED));
+                events.add(IntentEvent.getEvent(SUBMITTED, intent));
                 break;
 
             case REMOVE_INTENT:
@@ -450,6 +454,7 @@
                 } else {
                     transitionedToParking.remove(intent.id());
                 }
+                events.add(IntentEvent.getEvent(newState, intent));
                 break;
 
             case SET_INSTALLABLE:
@@ -478,9 +483,11 @@
         if (batchWriteResult.isSuccessful()) {
             // no-failure (except for invalid input)
             transitionedToParking.forEach((intentId) -> transientStates.remove(intentId));
+            notifyDelegate(events);
             return failed;
         } else {
             // everything failed
+            // FIXME what to do with events?
             return batch.operations();
         }
     }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
index b75e6a4..52d166c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
@@ -19,6 +19,9 @@
 import com.google.common.collect.Sets;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ItemEvent;
+import com.hazelcast.core.ItemListener;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -47,7 +50,6 @@
 import java.util.Map;
 import java.util.Set;
 
-
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -107,6 +109,9 @@
     @Deactivate
     public void deactivate() {
         leadershipService.removeListener(leaderListener);
+        for (ApplicationId appId: batchQueues.keySet()) {
+            leadershipService.withdraw(getTopic(appId));
+        }
         log.info("Stopped");
     }
 
@@ -125,12 +130,11 @@
         SQueue<IntentOperations> queue = batchQueues.get(appId);
         if (queue == null) {
             synchronized (this) {
-                // FIXME how will other instances find out about new queues
                 String topic = getTopic(appId);
                 IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
                 queue = new SQueue<>(rawQueue, serializer);
+                queue.addItemListener(new InternalItemListener(appId), false);
                 batchQueues.putIfAbsent(appId, queue);
-                // TODO others should run for leadership when they hear about this topic
                 leadershipService.runForLeadership(topic);
             }
         }
@@ -209,6 +213,25 @@
         }
     }
 
+    private class InternalItemListener implements ItemListener<IntentOperations> {
+
+        private final ApplicationId appId;
+
+        public InternalItemListener(ApplicationId appId) {
+            this.appId = appId;
+        }
+
+        @Override
+        public void itemAdded(ItemEvent<IntentOperations> item) {
+            dispatchNextOperation(appId);
+        }
+
+        @Override
+        public void itemRemoved(ItemEvent<IntentOperations> item) {
+            // no-op
+        }
+    }
+
     private class InternalLeaderListener implements LeadershipEventListener {
         @Override
         public void event(LeadershipEvent event) {
@@ -220,6 +243,8 @@
                 return;         // Not our topic: ignore
             }
             if (!event.subject().leader().id().equals(localControllerNode.id())) {
+                // run for leadership
+                getQueue(getAppId(topic));
                 return;         // The event is not about this instance: ignore
             }
 
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
index 9b9e2d1..f09a968 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentStore.java
@@ -20,6 +20,7 @@
 import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.hazelcast.core.EntryAdapter;
 import com.hazelcast.core.EntryEvent;
 import com.hazelcast.core.EntryListener;
@@ -171,15 +172,16 @@
     }
 
     @Override
-    public IntentEvent createIntent(Intent intent) {
+    public void createIntent(Intent intent) {
         Context timer = startTimer(createIntentTimer);
         try {
             Intent existing = intents.putIfAbsent(intent.id(), intent);
             if (existing != null) {
                 // duplicate, ignore
-                return null;
+                return;
             } else {
-                return this.setState(intent, IntentState.SUBMITTED);
+                this.setState(intent, IntentState.SUBMITTED);
+                return;
             }
         } finally {
             stopTimer(timer);
@@ -256,7 +258,7 @@
     }
 
     @Override
-    public IntentEvent setState(Intent intent, IntentState state) {
+    public void setState(Intent intent, IntentState state) {
         Context timer = startTimer(setStateTimer);
         try {
 
@@ -311,10 +313,10 @@
             final IntentState prevTransient = transientStates.put(id, state);
             log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
 
-            if (type == null) {
-                return null;
+            if (type != null) {
+                notifyDelegate(new IntentEvent(type, intent));
             }
-            return new IntentEvent(type, intent);
+            return;
         } finally {
             stopTimer(timer);
         }
@@ -358,6 +360,7 @@
         List<Operation> failed = new ArrayList<>();
 
         List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
+        List<IntentEvent> events = Lists.newArrayList();
 
         for (Operation op : batch.operations()) {
             switch (op.type()) {
@@ -434,6 +437,7 @@
                                  prevIntent, prevIntentState,
                                  intent, newIntentState);
                     }
+                    events.add(IntentEvent.getEvent(SUBMITTED, intent));
                 } catch (InterruptedException e) {
                     log.error("Batch write was interrupted while processing {}", op,  e);
                     failed.add(op);
@@ -487,6 +491,8 @@
                     if (PARKING.contains(newState)) {
                         transientStates.remove(intentId);
                     }
+                    events.add(IntentEvent.getEvent(newState, intent));
+
                     log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
                     // TODO sanity check and log?
                 } catch (InterruptedException e) {
@@ -554,6 +560,9 @@
                 break;
             }
         }
+
+        notifyDelegate(events);
+
         return failed;
     }
 
@@ -571,6 +580,8 @@
                     log.debug("{} state updated remotely, removing transient state {}",
                               intentId, oldState);
                 }
+
+                notifyDelegate(IntentEvent.getEvent(event.getValue(), getIntent(intentId)));
             }
         }
     }
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
index 51ee166..475beb5 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
@@ -16,6 +16,8 @@
 package org.onlab.onos.store.trivial.impl;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -26,6 +28,7 @@
 import org.onlab.onos.net.intent.IntentState;
 import org.onlab.onos.net.intent.IntentStore;
 import org.onlab.onos.net.intent.IntentStoreDelegate;
+import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
 import org.onlab.onos.store.AbstractStore;
 import org.slf4j.Logger;
 
@@ -33,6 +36,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.onlab.onos.net.intent.IntentState.WITHDRAWN;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -60,12 +64,13 @@
     }
 
     @Override
-    public IntentEvent createIntent(Intent intent) {
+    public void createIntent(Intent intent) {
         if (intents.containsKey(intent.id())) {
-            return null;
+            return;
         }
         intents.put(intent.id(), intent);
-        return this.setState(intent, IntentState.SUBMITTED);
+        this.setState(intent, IntentState.SUBMITTED);
+        return;
     }
 
     @Override
@@ -98,7 +103,7 @@
     }
 
     @Override
-    public IntentEvent setState(Intent intent, IntentState state) {
+    public void setState(Intent intent, IntentState state) {
         IntentId id = intent.id();
         states.put(id, state);
         IntentEvent.Type type = null;
@@ -119,10 +124,9 @@
         default:
             break;
         }
-        if (type == null) {
-            return null;
+        if (type != null) {
+            notifyDelegate(new IntentEvent(type, intent));
         }
-        return new IntentEvent(type, intent);
     }
 
     @Override
@@ -139,5 +143,60 @@
     public void removeInstalledIntents(IntentId intentId) {
         installable.remove(intentId);
     }
+    /**
+     * Execute writes in a batch.
+     *
+     * @param batch BatchWrite to execute
+     * @return failed operations
+     */
+    @Override
+    public List<Operation> batchWrite(BatchWrite batch) {
+        List<Operation> failed = Lists.newArrayList();
+        for (Operation op : batch.operations()) {
+            switch (op.type()) {
+            case CREATE_INTENT:
+                checkArgument(op.args().size() == 1,
+                              "CREATE_INTENT takes 1 argument. %s", op);
+                Intent intent = (Intent) op.args().get(0);
+                // TODO: what if it failed?
+                createIntent(intent);
+                break;
 
+            case REMOVE_INTENT:
+                checkArgument(op.args().size() == 1,
+                              "REMOVE_INTENT takes 1 argument. %s", op);
+                IntentId intentId = (IntentId) op.args().get(0);
+                removeIntent(intentId);
+                break;
+
+            case REMOVE_INSTALLED:
+                checkArgument(op.args().size() == 1,
+                              "REMOVE_INSTALLED takes 1 argument. %s", op);
+                intentId = (IntentId) op.args().get(0);
+                removeInstalledIntents(intentId);
+                break;
+
+            case SET_INSTALLABLE:
+                checkArgument(op.args().size() == 2,
+                              "SET_INSTALLABLE takes 2 arguments. %s", op);
+                intentId = (IntentId) op.args().get(0);
+                @SuppressWarnings("unchecked")
+                List<Intent> installableIntents = (List<Intent>) op.args().get(1);
+                setInstallableIntents(intentId, installableIntents);
+                break;
+
+            case SET_STATE:
+                checkArgument(op.args().size() == 2,
+                              "SET_STATE takes 2 arguments. %s", op);
+                intent = (Intent) op.args().get(0);
+                IntentState newState = (IntentState) op.args().get(1);
+                setState(intent, newState);
+                break;
+
+            default:
+                break;
+            }
+        }
+        return failed;
+    }
 }
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLeadershipManager.java
index c30e744..cbebcda 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLeadershipManager.java
@@ -19,7 +19,7 @@
 
 /**
  * A trivial implementation of the leadership service.
- * <p></p>
+ * <p>
  * The service is not distributed, so it can assume there's a single leadership
  * contender. This contender is always granted leadership whenever it asks.
  */