ONOS-2003 Fixing intent reroute after cluster change

Objective trackers update when partitions are shuffled to
track "local" intents.

Change-Id: I7cd9e4a935ddbc94813d5067d4febc084a89f508
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index b341165..09108d2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -30,7 +30,11 @@
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
 import org.onosproject.net.intent.PartitionService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +62,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
     protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
 
     static final int NUM_PARTITIONS = 14;
@@ -67,6 +74,7 @@
 
     private static final String ELECTION_PREFIX = "intent-partition-";
 
+    private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
     private LeadershipEventListener leaderListener = new InternalLeadershipListener();
     private ClusterEventListener clusterListener = new InternalClusterEventListener();
 
@@ -78,6 +86,9 @@
         leadershipService.addListener(leaderListener);
         clusterService.addListener(clusterListener);
 
+        listenerRegistry = new ListenerRegistry<>();
+        eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
         for (int i = 0; i < NUM_PARTITIONS; i++) {
             leadershipService.runForLeadership(getPartitionPath(i));
         }
@@ -90,6 +101,7 @@
     public void deactivate() {
         executor.shutdownNow();
 
+        eventDispatcher.removeSink(PartitionEvent.class);
         leadershipService.removeListener(leaderListener);
         clusterService.removeListener(clusterListener);
     }
@@ -133,6 +145,16 @@
         return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
     }
 
+    @Override
+    public void addListener(PartitionEventListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(PartitionEventListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
+
     protected void doRebalance() {
         rebalanceScheduled.set(false);
         try {
@@ -203,6 +225,9 @@
 
                 // See if we need to let some partitions go
                 scheduleRebalance(0);
+
+                eventDispatcher.post(new PartitionEvent(PartitionEvent.Type.LEADER_CHANGED,
+                                                        leadership.topic()));
             }
         }
     }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
index 3b091c0..25e23d3 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
@@ -28,6 +28,7 @@
 import org.onosproject.cluster.LeadershipService;
 import org.onosproject.cluster.LeadershipServiceAdapter;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.intent.Key;
 
 import java.util.HashMap;
@@ -86,6 +87,7 @@
 
         partitionManager.clusterService = new TestClusterService();
         partitionManager.leadershipService = leadershipService;
+        partitionManager.eventDispatcher = new TestEventDispatcher();
     }
 
     /**
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
index 8fde858..0ef44d8 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
@@ -19,6 +19,8 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.joda.time.DateTime;
 import org.onlab.packet.IpAddress;
@@ -28,7 +30,11 @@
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
 import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
 import org.onosproject.net.intent.PartitionService;
 import org.onosproject.store.AbstractStore;
 import org.slf4j.Logger;
@@ -55,14 +61,24 @@
 
     private final DateTime creationTime = DateTime.now();
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
+    private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
+
     @Activate
     public void activate() {
         instance = new DefaultControllerNode(new NodeId("local"), LOCALHOST);
+
+        listenerRegistry = new ListenerRegistry<>();
+        eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        eventDispatcher.removeSink(PartitionEvent.class);
         log.info("Stopped");
     }
 
@@ -110,4 +126,14 @@
     public NodeId getLeader(Key intentKey) {
         return instance.id();
     }
+
+    @Override
+    public void addListener(PartitionEventListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(PartitionEventListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
 }