LeadershipStore updates:
- Now tracking leader and candidates for a topic using a single map.
- Using term numbers that are incremented by one every time a new leader is elected.
- Introduced a separate LeadershipStore to conform to the manager-store pattern
Change-Id: I1d03a6c5e8ff0e68ef0c1e3a6c2d425c4856e470
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
index bfb4754..d2c63f3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
@@ -21,8 +21,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
@@ -76,7 +74,6 @@
private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
- private ClusterEventListener clusterListener = new InternalClusterEventListener();
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);
@@ -84,7 +81,6 @@
@Activate
public void activate() {
leadershipService.addListener(leaderListener);
- clusterService.addListener(clusterListener);
listenerRegistry = new ListenerRegistry<>();
eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
@@ -103,7 +99,6 @@
eventDispatcher.removeSink(IntentPartitionEvent.class);
leadershipService.removeListener(leaderListener);
- clusterService.removeListener(clusterListener);
}
/**
@@ -180,7 +175,7 @@
List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
.stream()
- .filter(l -> clusterService.getLocalNode().id().equals(l.leader()))
+ .filter(l -> clusterService.getLocalNode().id().equals(l.leaderNodeId()))
.filter(l -> l.topic().startsWith(ELECTION_PREFIX))
.collect(Collectors.toList());
@@ -220,24 +215,16 @@
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
- if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
+ if (Objects.equals(leadership.leaderNodeId(), clusterService.getLocalNode().id()) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
- // See if we need to let some partitions go
- scheduleRebalance(0);
-
eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
leadership.topic()));
}
- }
- }
- private final class InternalClusterEventListener implements
- ClusterEventListener {
-
- @Override
- public void event(ClusterEvent event) {
- scheduleRebalance(0);
+ if (event.type() == LeadershipEvent.Type.CANDIDATES_CHANGED) {
+ scheduleRebalance(0);
+ }
}
}
}