ONOS-2003 Fixing intent reroute after cluster change
Objective trackers update when partitions are shuffled to
track "local" intents.
Change-Id: I7cd9e4a935ddbc94813d5067d4febc084a89f508
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index b341165..09108d2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -30,7 +30,11 @@
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
import org.onosproject.net.intent.PartitionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +62,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
static final int NUM_PARTITIONS = 14;
@@ -67,6 +74,7 @@
private static final String ELECTION_PREFIX = "intent-partition-";
+ private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
private ClusterEventListener clusterListener = new InternalClusterEventListener();
@@ -78,6 +86,9 @@
leadershipService.addListener(leaderListener);
clusterService.addListener(clusterListener);
+ listenerRegistry = new ListenerRegistry<>();
+ eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
for (int i = 0; i < NUM_PARTITIONS; i++) {
leadershipService.runForLeadership(getPartitionPath(i));
}
@@ -90,6 +101,7 @@
public void deactivate() {
executor.shutdownNow();
+ eventDispatcher.removeSink(PartitionEvent.class);
leadershipService.removeListener(leaderListener);
clusterService.removeListener(clusterListener);
}
@@ -133,6 +145,16 @@
return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
}
+ @Override
+ public void addListener(PartitionEventListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(PartitionEventListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
protected void doRebalance() {
rebalanceScheduled.set(false);
try {
@@ -203,6 +225,9 @@
// See if we need to let some partitions go
scheduleRebalance(0);
+
+ eventDispatcher.post(new PartitionEvent(PartitionEvent.Type.LEADER_CHANGED,
+ leadership.topic()));
}
}
}