Adding Leadership Listener to IntentBatchService

ObjectiveTracker uses Leadership Listener to track intents that
it has become the leader of.

Change-Id: I039accb30d27ad718d79a9fec3f546dbdc78e62e
diff --git a/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java b/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
index 5f1ea29..fd70a0b 100644
--- a/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
+++ b/apps/demo/src/main/java/org/onlab/onos/demo/DemoInstaller.java
@@ -266,9 +266,8 @@
                 }
                 count++;
                 if (count > ITERATIONMAX) {
-                    log.warn("A batch is stuck processing. current : {}" +
-                                     ", pending : {}",
-                             intentBatchService.getCurrentOperations(),
+                    log.warn("A batch is stuck processing. " +
+                                     "pending : {}",
                              intentBatchService.getPendingOperations());
                     shutdownAndAwaitTermination(randomWorker);
                 }
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchLeaderEvent.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchLeaderEvent.java
new file mode 100644
index 0000000..cd5297f
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchLeaderEvent.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.net.intent;
+
+import org.onlab.onos.core.ApplicationId;
+import org.onlab.onos.event.AbstractEvent;
+
+/**
+ * A class to represent an intent related event.
+ */
+public class IntentBatchLeaderEvent extends AbstractEvent<IntentBatchLeaderEvent.Type, ApplicationId> {
+
+    public enum Type {
+        /**
+         * Signifies that this instance has become the leader for the given application id.
+         */
+        ELECTED,
+
+        /**
+         * Signifies that instance is no longer the leader for a given application id.
+         */
+        BOOTED
+    }
+
+    /**
+     * Creates an event of a given type and for the specified appId and the
+     * current time.
+     *
+     * @param type   event type
+     * @param appId  subject appId
+     * @param time   time the event created in milliseconds since start of epoch
+     */
+    public IntentBatchLeaderEvent(Type type, ApplicationId appId, long time) {
+        super(type, appId, time);
+    }
+
+    /**
+     * Creates an event of a given type and for the specified appId and the
+     * current time.
+     *
+     * @param type   event type
+     * @param appId subject appId
+     */
+    public IntentBatchLeaderEvent(Type type, ApplicationId appId) {
+        super(type, appId);
+    }
+
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchListener.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchListener.java
new file mode 100644
index 0000000..fb5c600
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.net.intent;
+
+import org.onlab.onos.event.EventListener;
+
+/**
+ * Listener for {@link org.onlab.onos.net.intent.IntentEvent intent events}.
+ */
+public interface IntentBatchListener extends EventListener<IntentBatchLeaderEvent> {
+}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
index 8a58aa2..246ece8 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentBatchService.java
@@ -45,14 +45,6 @@
     Set<IntentOperations> getPendingOperations();
 
     /**
-     * Returns the set of intent batches currently being processed.
-     * @return set of batches
-     */
-    //TODO we may want to get rid of this method
-    @Deprecated
-    Set<IntentOperations> getCurrentOperations();
-
-    /**
      * Return true if this instance is the local leader for batch
      * processing a given application id.
      *
@@ -75,4 +67,17 @@
      */
     void unsetDelegate(IntentBatchDelegate delegate);
 
+    /**
+     * Adds the specified listener for intent batch leadership events.
+     *
+     * @param listener listener to be added
+     */
+    void addListener(IntentBatchListener listener);
+
+    /**
+     * Removes the specified listener for intent batch leadership events.
+     *
+     * @param listener listener to be removed
+     */
+    void removeListener(IntentBatchListener listener);
 }
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
index 22673b6..83a3ccb 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
@@ -15,22 +15,25 @@
  */
 package org.onlab.onos.net.intent.impl;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.event.Event;
 import org.onlab.onos.net.Link;
 import org.onlab.onos.net.LinkKey;
 import org.onlab.onos.net.NetworkResource;
+import org.onlab.onos.net.intent.IntentBatchLeaderEvent;
+import org.onlab.onos.net.intent.IntentBatchListener;
+import org.onlab.onos.net.intent.IntentBatchService;
 import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.IntentService;
 import org.onlab.onos.net.link.LinkEvent;
 import org.onlab.onos.net.resource.LinkResourceEvent;
 import org.onlab.onos.net.resource.LinkResourceListener;
@@ -40,8 +43,10 @@
 import org.onlab.onos.net.topology.TopologyService;
 import org.slf4j.Logger;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -72,18 +77,26 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected LinkResourceService resourceManager;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentService intentService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected IntentBatchService batchService;
+
     private ExecutorService executorService =
             newSingleThreadExecutor(namedThreads("onos-flowtracker"));
 
     private TopologyListener listener = new InternalTopologyListener();
     private LinkResourceListener linkResourceListener =
             new InternalLinkResourceListener();
+    private final LeadershipListener leaderListener = new LeadershipListener();
     private TopologyChangeDelegate delegate;
 
     @Activate
     public void activate() {
         topologyService.addListener(listener);
         resourceManager.addListener(linkResourceListener);
+        batchService.addListener(leaderListener);
         log.info("Started");
     }
 
@@ -91,6 +104,7 @@
     public void deactivate() {
         topologyService.removeListener(listener);
         resourceManager.removeListener(linkResourceListener);
+        batchService.removeListener(leaderListener);
         log.info("Stopped");
     }
 
@@ -220,6 +234,38 @@
 
     //TODO consider adding flow rule event tracking
 
-    //FIXME the only intents that will be tracked are events that were
-    //executed on this instance. Need to have some backup trackers...
+    private void updateTrackedResources(ApplicationId appId, boolean track) {
+        intentService.getIntents().forEach(intent -> {
+            if (intent.appId().equals(appId)) {
+                IntentId id = intent.id();
+                Collection<NetworkResource> resources = Lists.newArrayList();
+                intentService.getInstallableIntents(id).stream()
+                        .map(installable -> installable.resources())
+                        .forEach(resources::addAll);
+                if (track) {
+                    addTrackedResources(id, resources);
+                } else {
+                    removeTrackedResources(id, resources);
+                }
+            }
+        });
+    }
+
+    private class LeadershipListener implements IntentBatchListener {
+        @Override
+        public void event(IntentBatchLeaderEvent event) {
+            log.debug("leadership event: {}", event);
+            ApplicationId appId = event.subject();
+            switch (event.type()) {
+                case ELECTED:
+                    updateTrackedResources(appId, true);
+                    break;
+                case BOOTED:
+                    updateTrackedResources(appId, false);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
 }
diff --git a/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
index f27ddc1..ea7bc44 100644
--- a/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/intent/impl/IntentManagerTest.java
@@ -111,16 +111,13 @@
         //the batch has not yet been removed when we receive the last event
         // FIXME: this doesn't guarantee to avoid the race
         for (int tries = 0; tries < 10; tries++) {
-            if (manager.batchService.getPendingOperations().isEmpty() &&
-                    manager.batchService.getCurrentOperations().isEmpty()) {
+            if (manager.batchService.getPendingOperations().isEmpty()) {
                 break;
             }
             delay(10);
         }
         assertTrue("There are still pending batch operations.",
                    manager.batchService.getPendingOperations().isEmpty());
-        assertTrue("There are still outstanding batch operations.",
-                   manager.batchService.getCurrentOperations().isEmpty());
 
         extensionService.unregisterCompiler(MockIntent.class);
         extensionService.unregisterInstaller(MockInstallableIntent.class);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
index 52d166c..9d9b833 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
@@ -35,7 +35,11 @@
 import org.onlab.onos.cluster.LeadershipService;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.core.CoreService;
+import org.onlab.onos.event.AbstractListenerRegistry;
+import org.onlab.onos.event.EventDeliveryService;
 import org.onlab.onos.net.intent.IntentBatchDelegate;
+import org.onlab.onos.net.intent.IntentBatchLeaderEvent;
+import org.onlab.onos.net.intent.IntentBatchListener;
 import org.onlab.onos.net.intent.IntentBatchService;
 import org.onlab.onos.net.intent.IntentOperations;
 import org.onlab.onos.store.hz.SQueue;
@@ -46,7 +50,6 @@
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
@@ -74,6 +77,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StoreService storeService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
+
     private HazelcastInstance theInstance;
     private ControllerNode localControllerNode;
     protected StoreSerializer serializer;
@@ -85,6 +92,9 @@
     private final Map<ApplicationId, IntentOperations> outstandingOps
             = Maps.newHashMap();
 
+    private final AbstractListenerRegistry<IntentBatchLeaderEvent, IntentBatchListener>
+            listenerRegistry = new AbstractListenerRegistry<>();
+
     @Activate
     public void activate() {
         theInstance = storeService.getHazelcastInstance();
@@ -103,11 +113,13 @@
             }
 
         };
+        eventDispatcher.addSink(IntentBatchLeaderEvent.class, listenerRegistry);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        eventDispatcher.removeSink(IntentBatchLeaderEvent.class);
         leadershipService.removeListener(leaderListener);
         for (ApplicationId appId: batchQueues.keySet()) {
             leadershipService.withdraw(getTopic(appId));
@@ -277,12 +289,6 @@
     }
 
     @Override
-    public Set<IntentOperations> getCurrentOperations() {
-        //FIXME this is not really implemented
-        return Collections.emptySet();
-    }
-
-    @Override
     public boolean isLocalLeader(ApplicationId applicationId) {
         return myTopics.contains(applicationId);
     }
@@ -298,4 +304,14 @@
             this.delegate = null;
         }
     }
+
+    @Override
+    public void addListener(IntentBatchListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(IntentBatchListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
 }
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
index 57b854f..b6223c9 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentBatchQueue.java
@@ -22,6 +22,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.net.intent.IntentBatchDelegate;
+import org.onlab.onos.net.intent.IntentBatchListener;
 import org.onlab.onos.net.intent.IntentBatchService;
 import org.onlab.onos.net.intent.IntentOperations;
 import org.slf4j.Logger;
@@ -83,14 +84,9 @@
     @Override
     public Set<IntentOperations> getPendingOperations() {
         synchronized (this) {
-            return Sets.newHashSet(pendingBatches);
-        }
-    }
-
-    @Override
-    public Set<IntentOperations> getCurrentOperations() {
-        synchronized (this) {
-            return Sets.newHashSet(currentBatches);
+            Set<IntentOperations> set = Sets.newHashSet(pendingBatches);
+            set.addAll(currentBatches); // TODO refactor this current vs. pending
+            return set;
         }
     }
 
@@ -110,4 +106,16 @@
             this.delegate = null;
         }
     }
+
+    @Override
+    public void addListener(IntentBatchListener listener) {
+        // no-op
+        //TODO: we are always the master
+    }
+
+    @Override
+    public void removeListener(IntentBatchListener listener) {
+        // no-op
+        //TODO: we are always the master
+    }
 }