ONOS-2003 Fixing intent reroute after cluster change

Objective trackers update when partitions are shuffled to
track "local" intents.

Change-Id: I7cd9e4a935ddbc94813d5067d4febc084a89f508
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEventListener.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEventListener.java
index 20f2c24..53d84b1 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEventListener.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEventListener.java
@@ -20,5 +20,5 @@
 /**
  * Entity capable of receiving device leadership-related events.
  */
-public interface LeadershipEventListener  extends EventListener<LeadershipEvent> {
+public interface LeadershipEventListener extends EventListener<LeadershipEvent> {
 }
diff --git a/core/api/src/main/java/org/onosproject/net/intent/PartitionEvent.java b/core/api/src/main/java/org/onosproject/net/intent/PartitionEvent.java
new file mode 100644
index 0000000..48623dd
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/intent/PartitionEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.intent;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Partition event.
+ */
+//TODO change String into a proper object type
+public class PartitionEvent extends AbstractEvent<PartitionEvent.Type, String> {
+
+    public enum Type {
+        LEADER_CHANGED
+    }
+
+    public PartitionEvent(Type type, String partition) {
+        super(type, partition);
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/PartitionEventListener.java b/core/api/src/main/java/org/onosproject/net/intent/PartitionEventListener.java
new file mode 100644
index 0000000..73b9de9
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/intent/PartitionEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.intent;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Entity capable of receiving device partition-related events.
+ */
+public interface PartitionEventListener extends EventListener<PartitionEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java b/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
index e963abc..c636cd2 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/PartitionService.java
@@ -40,4 +40,18 @@
     NodeId getLeader(Key intentKey);
 
     // TODO add API for rebalancing partitions
+
+    /**
+     * Registers a event listener to be notified of partition events.
+     *
+     * @param listener listener that will asynchronously notified of partition events.
+     */
+    void addListener(PartitionEventListener listener);
+
+    /**
+     * Unregisters a event listener for partition events.
+     *
+     * @param listener listener to be removed.
+     */
+    void removeListener(PartitionEventListener listener);
 }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
index 939b9a1..33f5a6e 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/ObjectiveTracker.java
@@ -23,6 +23,7 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.event.Event;
@@ -38,8 +39,12 @@
 import org.onosproject.net.host.HostEvent;
 import org.onosproject.net.host.HostListener;
 import org.onosproject.net.host.HostService;
+import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentService;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
+import org.onosproject.net.intent.PartitionService;
 import org.onosproject.net.link.LinkEvent;
 import org.onosproject.net.resource.link.LinkResourceEvent;
 import org.onosproject.net.resource.link.LinkResourceListener;
@@ -54,6 +59,10 @@
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -94,25 +103,35 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected HostService hostService;
 
-    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+               policy = ReferencePolicy.DYNAMIC)
     protected IntentService intentService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PartitionService partitionService;
+
     private ExecutorService executorService =
             newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker"));
+    private ScheduledExecutorService executor = Executors
+            .newScheduledThreadPool(1);
 
     private TopologyListener listener = new InternalTopologyListener();
     private LinkResourceListener linkResourceListener =
             new InternalLinkResourceListener();
     private DeviceListener deviceListener = new InternalDeviceListener();
     private HostListener hostListener = new InternalHostListener();
+    private PartitionEventListener partitionListener = new InternalPartitionListener();
     private TopologyChangeDelegate delegate;
 
+    protected final AtomicBoolean updateScheduled = new AtomicBoolean(false);
+
     @Activate
     public void activate() {
         topologyService.addListener(listener);
         resourceManager.addListener(linkResourceListener);
         deviceService.addListener(deviceListener);
         hostService.addListener(hostListener);
+        partitionService.addListener(partitionListener);
         log.info("Started");
     }
 
@@ -122,6 +141,7 @@
         resourceManager.removeListener(linkResourceListener);
         deviceService.removeListener(deviceListener);
         hostService.removeListener(hostListener);
+        partitionService.removeListener(partitionListener);
         log.info("Stopped");
     }
 
@@ -268,7 +288,7 @@
 
     private void updateTrackedResources(ApplicationId appId, boolean track) {
         if (intentService == null) {
-            log.debug("Intent service is not bound yet");
+            log.warn("Intent service is not bound yet");
             return;
         }
         intentService.getIntents().forEach(intent -> {
@@ -342,4 +362,52 @@
             executorService.execute(new DeviceAvailabilityHandler(id, available));
         }
     }
+
+    protected void doIntentUpdate() {
+        updateScheduled.set(false);
+        if (intentService == null) {
+            log.warn("Intent service is not bound yet");
+            return;
+        }
+        try {
+            //FIXME very inefficient
+            for (Intent intent : intentService.getIntents()) {
+                try {
+                    if (intentService.isLocal(intent.key())) {
+                        log.warn("intent {}, old: {}, new: {}",
+                                 intent.key(), intentsByDevice.values().contains(intent.key()), true);
+                        addTrackedResources(intent.key(), intent.resources());
+                        intentService.getInstallableIntents(intent.key()).stream()
+                                .forEach(installable ->
+                                                 addTrackedResources(intent.key(), installable.resources()));
+                    } else {
+                        log.warn("intent {}, old: {}, new: {}",
+                                 intent.key(), intentsByDevice.values().contains(intent.key()), false);
+                        removeTrackedResources(intent.key(), intent.resources());
+                        intentService.getInstallableIntents(intent.key()).stream()
+                                .forEach(installable ->
+                                                 removeTrackedResources(intent.key(), installable.resources()));
+                    }
+                } catch (NullPointerException npe) {
+                    log.warn("intent error {}", intent.key(), npe);
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Exception caught during update task", e);
+        }
+    }
+
+    private void scheduleIntentUpdate(int afterDelaySec) {
+        if (updateScheduled.compareAndSet(false, true)) {
+            executor.schedule(this::doIntentUpdate, afterDelaySec, TimeUnit.SECONDS);
+        }
+    }
+
+    private final class InternalPartitionListener implements PartitionEventListener {
+        @Override
+        public void event(PartitionEvent event) {
+            log.warn("got message {}", event.subject());
+            scheduleIntentUpdate(1);
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index b341165..09108d2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -30,7 +30,11 @@
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
 import org.onosproject.net.intent.PartitionService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +62,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
     protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
 
     static final int NUM_PARTITIONS = 14;
@@ -67,6 +74,7 @@
 
     private static final String ELECTION_PREFIX = "intent-partition-";
 
+    private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
     private LeadershipEventListener leaderListener = new InternalLeadershipListener();
     private ClusterEventListener clusterListener = new InternalClusterEventListener();
 
@@ -78,6 +86,9 @@
         leadershipService.addListener(leaderListener);
         clusterService.addListener(clusterListener);
 
+        listenerRegistry = new ListenerRegistry<>();
+        eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
         for (int i = 0; i < NUM_PARTITIONS; i++) {
             leadershipService.runForLeadership(getPartitionPath(i));
         }
@@ -90,6 +101,7 @@
     public void deactivate() {
         executor.shutdownNow();
 
+        eventDispatcher.removeSink(PartitionEvent.class);
         leadershipService.removeListener(leaderListener);
         clusterService.removeListener(clusterListener);
     }
@@ -133,6 +145,16 @@
         return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
     }
 
+    @Override
+    public void addListener(PartitionEventListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(PartitionEventListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
+
     protected void doRebalance() {
         rebalanceScheduled.set(false);
         try {
@@ -203,6 +225,9 @@
 
                 // See if we need to let some partitions go
                 scheduleRebalance(0);
+
+                eventDispatcher.post(new PartitionEvent(PartitionEvent.Type.LEADER_CHANGED,
+                                                        leadership.topic()));
             }
         }
     }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
index 3b091c0..25e23d3 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
@@ -28,6 +28,7 @@
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.LeadershipServiceAdapter;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.intent.Key;
 
 import java.util.HashMap;
@@ -86,6 +87,7 @@
 
         partitionManager.clusterService = new TestClusterService();
         partitionManager.leadershipService = leadershipService;
+        partitionManager.eventDispatcher = new TestEventDispatcher();
     }
 
     /**
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
index 8fde858..0ef44d8 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
@@ -19,6 +19,8 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.joda.time.DateTime;
 import org.onlab.packet.IpAddress;
@@ -28,7 +30,11 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
 import org.onosproject.net.intent.PartitionService;
 import org.onosproject.store.AbstractStore;
 import org.slf4j.Logger;
@@ -55,14 +61,24 @@
 
     private final DateTime creationTime = DateTime.now();
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
+    private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
+
     @Activate
     public void activate() {
         instance = new DefaultControllerNode(new NodeId("local"), LOCALHOST);
+
+        listenerRegistry = new ListenerRegistry<>();
+        eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        eventDispatcher.removeSink(PartitionEvent.class);
         log.info("Stopped");
     }
 
@@ -110,4 +126,14 @@
     public NodeId getLeader(Key intentKey) {
         return instance.id();
     }
+
+    @Override
+    public void addListener(PartitionEventListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(PartitionEventListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
 }