fix intent issues yuta observed

Change-Id: I7dc4a19d49a1b3fc18ecce02a4018cbc9a3043fc
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
index b75e6a4..52d166c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
@@ -19,6 +19,9 @@
 import com.google.common.collect.Sets;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ItemEvent;
+import com.hazelcast.core.ItemListener;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -47,7 +50,6 @@
 import java.util.Map;
 import java.util.Set;
 
-
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -107,6 +109,9 @@
     @Deactivate
     public void deactivate() {
         leadershipService.removeListener(leaderListener);
+        for (ApplicationId appId: batchQueues.keySet()) {
+            leadershipService.withdraw(getTopic(appId));
+        }
         log.info("Stopped");
     }
 
@@ -125,12 +130,11 @@
         SQueue<IntentOperations> queue = batchQueues.get(appId);
         if (queue == null) {
             synchronized (this) {
-                // FIXME how will other instances find out about new queues
                 String topic = getTopic(appId);
                 IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
                 queue = new SQueue<>(rawQueue, serializer);
+                queue.addItemListener(new InternalItemListener(appId), false);
                 batchQueues.putIfAbsent(appId, queue);
-                // TODO others should run for leadership when they hear about this topic
                 leadershipService.runForLeadership(topic);
             }
         }
@@ -209,6 +213,25 @@
         }
     }
 
+    private class InternalItemListener implements ItemListener<IntentOperations> {
+
+        private final ApplicationId appId;
+
+        public InternalItemListener(ApplicationId appId) {
+            this.appId = appId;
+        }
+
+        @Override
+        public void itemAdded(ItemEvent<IntentOperations> item) {
+            dispatchNextOperation(appId);
+        }
+
+        @Override
+        public void itemRemoved(ItemEvent<IntentOperations> item) {
+            // no-op
+        }
+    }
+
     private class InternalLeaderListener implements LeadershipEventListener {
         @Override
         public void event(LeadershipEvent event) {
@@ -220,6 +243,8 @@
                 return;         // Not our topic: ignore
             }
             if (!event.subject().leader().id().equals(localControllerNode.id())) {
+                // run for leadership
+                getQueue(getAppId(topic));
                 return;         // The event is not about this instance: ignore
             }