ONOS-2003 Fixing intent reroute after cluster change
Objective trackers update when partitions are shuffled to
track "local" intents.
Change-Id: I7cd9e4a935ddbc94813d5067d4febc084a89f508
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
index b341165..09108d2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/PartitionManager.java
@@ -30,7 +30,11 @@
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
import org.onosproject.net.intent.PartitionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +62,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false);
static final int NUM_PARTITIONS = 14;
@@ -67,6 +74,7 @@
private static final String ELECTION_PREFIX = "intent-partition-";
+ private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
private ClusterEventListener clusterListener = new InternalClusterEventListener();
@@ -78,6 +86,9 @@
leadershipService.addListener(leaderListener);
clusterService.addListener(clusterListener);
+ listenerRegistry = new ListenerRegistry<>();
+ eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
for (int i = 0; i < NUM_PARTITIONS; i++) {
leadershipService.runForLeadership(getPartitionPath(i));
}
@@ -90,6 +101,7 @@
public void deactivate() {
executor.shutdownNow();
+ eventDispatcher.removeSink(PartitionEvent.class);
leadershipService.removeListener(leaderListener);
clusterService.removeListener(clusterListener);
}
@@ -133,6 +145,16 @@
return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
}
+ @Override
+ public void addListener(PartitionEventListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(PartitionEventListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
protected void doRebalance() {
rebalanceScheduled.set(false);
try {
@@ -203,6 +225,9 @@
// See if we need to let some partitions go
scheduleRebalance(0);
+
+ eventDispatcher.post(new PartitionEvent(PartitionEvent.Type.LEADER_CHANGED,
+ leadership.topic()));
}
}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
index 3b091c0..25e23d3 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
@@ -28,6 +28,7 @@
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.cluster.NodeId;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.intent.Key;
import java.util.HashMap;
@@ -86,6 +87,7 @@
partitionManager.clusterService = new TestClusterService();
partitionManager.leadershipService = leadershipService;
+ partitionManager.eventDispatcher = new TestEventDispatcher();
}
/**
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
index 8fde858..0ef44d8 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
@@ -19,6 +19,8 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
@@ -28,7 +30,11 @@
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.intent.Key;
+import org.onosproject.net.intent.PartitionEvent;
+import org.onosproject.net.intent.PartitionEventListener;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.store.AbstractStore;
import org.slf4j.Logger;
@@ -55,14 +61,24 @@
private final DateTime creationTime = DateTime.now();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+ private ListenerRegistry<PartitionEvent, PartitionEventListener> listenerRegistry;
+
@Activate
public void activate() {
instance = new DefaultControllerNode(new NodeId("local"), LOCALHOST);
+
+ listenerRegistry = new ListenerRegistry<>();
+ eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
+
log.info("Started");
}
@Deactivate
public void deactivate() {
+ eventDispatcher.removeSink(PartitionEvent.class);
log.info("Stopped");
}
@@ -110,4 +126,14 @@
public NodeId getLeader(Key intentKey) {
return instance.id();
}
+
+ @Override
+ public void addListener(PartitionEventListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(PartitionEventListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
}