ONOS-2003 Fixing intent reroute after cluster change

Objective trackers update when partitions are shuffled to
track "local" intents.

Change-Id: I7cd9e4a935ddbc94813d5067d4febc084a89f508
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index b341165..09108d2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -30,7 +30,11 @@
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
 import org.onosproject.net.intent.PartitionService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +62,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
     protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
 
     static final int NUM_PARTITIONS = 14;
@@ -67,6 +74,7 @@
 
     private static final String ELECTION_PREFIX = "intent-partition-";
 
+    private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
     private LeadershipEventListener leaderListener = new InternalLeadershipListener();
     private ClusterEventListener clusterListener = new InternalClusterEventListener();
 
@@ -78,6 +86,9 @@
         leadershipService.addListener(leaderListener);
         clusterService.addListener(clusterListener);
 
+        listenerRegistry = new ListenerRegistry<>();
+        eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
         for (int i = 0; i < NUM_PARTITIONS; i++) {
             leadershipService.runForLeadership(getPartitionPath(i));
         }
@@ -90,6 +101,7 @@
     public void deactivate() {
         executor.shutdownNow();
 
+        eventDispatcher.removeSink(PartitionEvent.class);
         leadershipService.removeListener(leaderListener);
         clusterService.removeListener(clusterListener);
     }
@@ -133,6 +145,16 @@
         return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
     }
 
+    @Override
+    public void addListener(PartitionEventListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(PartitionEventListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
+
     protected void doRebalance() {
         rebalanceScheduled.set(false);
         try {
@@ -203,6 +225,9 @@
 
                 // See if we need to let some partitions go
                 scheduleRebalance(0);
+
+                eventDispatcher.post(new PartitionEvent(PartitionEvent.Type.LEADER_CHANGED,
+                                                        leadership.topic()));
             }
         }
     }