LeadershipStore updates:
- Now tracking leader and candidates for a topic using a single map.
- Using term numbers that are incremented by one every time a new leader is elected.
- Introduced a separate LeadershipStore to conform to the manager-store pattern
Change-Id: I1d03a6c5e8ff0e68ef0c1e3a6c2d425c4856e470
diff --git a/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java b/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java
index a3cf7b3..a6f3d1c 100644
--- a/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java
+++ b/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java
@@ -16,6 +16,7 @@
package org.onosproject.cip;
import com.google.common.io.ByteStreams;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -93,7 +94,7 @@
cfgService.registerProperties(getClass());
localId = clusterService.getLocalNode().id();
- processLeadershipChange(leadershipService.getLeader(CLUSTER_IP));
+ processLeaderChange(leadershipService.getLeader(CLUSTER_IP));
leadershipService.addListener(listener);
leadershipService.runForLeadership(CLUSTER_IP);
@@ -137,10 +138,7 @@
}
}
- private synchronized void processLeadershipChange(NodeId newLeader) {
- if (newLeader == null) {
- return;
- }
+ private synchronized void processLeaderChange(NodeId newLeader) {
boolean isLeader = Objects.equals(newLeader, localId);
log.info("Processing leadership change; wasLeader={}, isLeader={}", wasLeader, isLeader);
if (!wasLeader && isLeader) {
@@ -189,11 +187,15 @@
// Listens for leadership changes.
private class InternalLeadershipListener implements LeadershipEventListener {
+
+ @Override
+ public boolean isRelevant(LeadershipEvent event) {
+ return CLUSTER_IP.equals(event.subject().topic());
+ }
+
@Override
public void event(LeadershipEvent event) {
- if (event.subject().topic().equals(CLUSTER_IP)) {
- processLeadershipChange(event.subject().leader());
- }
+ processLeaderChange(event.subject().leaderNodeId());
}
}
diff --git a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
index bcf4e2e..b2a33a9 100644
--- a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
+++ b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
@@ -19,6 +19,7 @@
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -105,10 +106,7 @@
log.info("Stopped");
}
- private synchronized void processLeadershipChange(NodeId newLeader) {
- if (newLeader == null) {
- return;
- }
+ private synchronized void processLeaderChange(NodeId newLeader) {
boolean currLeader = newLeader.equals(localId);
if (isLeader.getAndSet(currLeader) != currLeader) {
if (currLeader) {
@@ -159,7 +157,7 @@
@Override
public void event(LeadershipEvent event) {
- processLeadershipChange(event.subject().leader());
+ processLeaderChange(event.subject().leaderNodeId());
}
}
-}
\ No newline at end of file
+}
diff --git a/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java b/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java
index 22c7d43..1811049 100644
--- a/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java
+++ b/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java
@@ -26,6 +26,7 @@
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.intent.Intent;
@@ -43,7 +44,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -74,6 +74,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService intentService;
+ private NodeId localNodeId;
private ApplicationId appId;
private final InternalLeadershipListener leadershipEventListener =
@@ -89,7 +90,7 @@
@Activate
public void activate() {
intentsSynchronizerExecutor = createExecutor();
-
+ this.localNodeId = clusterService.getLocalNode().id();
this.appId = coreService.registerApplication(APP_NAME);
leadershipService.addListener(leadershipEventListener);
@@ -268,27 +269,22 @@
private class InternalLeadershipListener implements LeadershipEventListener {
@Override
- public void event(LeadershipEvent event) {
- if (!event.subject().topic().equals(appId.name())) {
- // Not our topic: ignore
- return;
- }
- if (!Objects.equals(event.subject().leader(),
- clusterService.getLocalNode().id())) {
- // The event is not about this instance: ignore
- return;
- }
+ public boolean isRelevant(LeadershipEvent event) {
+ return event.subject().topic().equals(appId.name());
+ }
+ @Override
+ public void event(LeadershipEvent event) {
switch (event.type()) {
- case LEADER_ELECTED:
- log.info("IntentSynchronizer gained leadership");
- leaderChanged(true);
- break;
- case LEADER_BOOTED:
- log.info("IntentSynchronizer lost leadership");
- leaderChanged(false);
- break;
- case LEADER_REELECTED:
+ case LEADER_CHANGED:
+ case LEADER_AND_CANDIDATES_CHANGED:
+ if (localNodeId.equals(event.subject().leaderNodeId())) {
+ log.info("IntentSynchronizer gained leadership");
+ leaderChanged(true);
+ } else {
+ log.info("IntentSynchronizer leader changed. New leader is {}", event.subject().leaderNodeId());
+ leaderChanged(false);
+ }
default:
break;
}
diff --git a/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java b/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java
index d265323..351a099 100644
--- a/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java
+++ b/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java
@@ -17,6 +17,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
+
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
@@ -29,7 +30,11 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onosproject.TestApplicationId;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.incubator.net.intf.Interface;
@@ -94,6 +99,9 @@
private static final ApplicationId APPID =
TestApplicationId.create("intent-sync-test");
+ private static final ControllerNode LOCAL_NODE =
+ new DefaultControllerNode(new NodeId("foo"), IpAddress.valueOf("127.0.0.1"));
+
@Before
public void setUp() throws Exception {
super.setUp();
@@ -105,6 +113,7 @@
intentSynchronizer = new TestIntentSynchronizer();
intentSynchronizer.coreService = new TestCoreService();
+ intentSynchronizer.clusterService = new TestClusterService();
intentSynchronizer.leadershipService = new TestLeadershipService();
intentSynchronizer.intentService = intentService;
@@ -441,6 +450,13 @@
}
}
+ private class TestClusterService extends ClusterServiceAdapter {
+ @Override
+ public ControllerNode getLocalNode() {
+ return LOCAL_NODE;
+ }
+ }
+
private class TestLeadershipService extends LeadershipServiceAdapter {
}
diff --git a/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java b/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java
index b2a7207..e0d3b27 100644
--- a/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java
+++ b/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java
@@ -24,12 +24,11 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.CoreService;
-import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
-
import org.slf4j.Logger;
@@ -56,7 +55,7 @@
private LeadershipEventListener leadershipEventListener =
new InnerLeadershipEventListener();
- private ControllerNode localControllerNode;
+ private NodeId localNodeId;
@Activate
@@ -65,7 +64,7 @@
appId = coreService.registerApplication(ELECTION_APP);
- localControllerNode = clusterService.getLocalNode();
+ localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
}
@@ -100,20 +99,10 @@
log.debug("Leadership Event: time = {} type = {} event = {}",
event.time(), event.type(), event);
- if (!event.subject().leader().equals(
- localControllerNode.id())) {
- return; // The event is not about this instance: ignore
- }
-
switch (event.type()) {
- case LEADER_ELECTED:
- log.info("Election-test app leader elected");
- break;
- case LEADER_BOOTED:
- log.info("Election-test app lost election");
- break;
- case LEADER_REELECTED:
- log.debug("Election-test app was re-elected");
+ case LEADER_CHANGED:
+ case LEADER_AND_CANDIDATES_CHANGED:
+ log.info("Election-test app leader changed. New leadership: {}", event.subject());
break;
default:
break;