LeadershipService API change: Using NodeId in place of ControllerNode.
Change-Id: I6f688506c3672977456fc6921b26e98be2239632
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
index eddf7e4..4c81c15 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
@@ -167,7 +167,7 @@
if (!event.subject().topic().equals(appId.name())) {
return; // Not our topic: ignore
}
- if (!event.subject().leader().id().equals(
+ if (!event.subject().leader().equals(
localControllerNode.id())) {
return; // The event is not about this instance: ignore
}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/LeaderCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/LeaderCommand.java
index 7ea3ebb..ec99614 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/LeaderCommand.java
@@ -37,7 +37,7 @@
Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
print(FMT, "Topic", "Leader");
for (String topic : leaderBoard.keySet()) {
- print(FMT, topic, leaderBoard.get(topic).leader().id());
+ print(FMT, topic, leaderBoard.get(topic).leader());
}
}
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java b/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
index cf247f5..6be7c9b 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
@@ -10,10 +10,10 @@
public class Leadership {
private final String topic;
- private final ControllerNode leader;
+ private final NodeId leader;
private final long epoch;
- public Leadership(String topic, ControllerNode leader, long epoch) {
+ public Leadership(String topic, NodeId leader, long epoch) {
this.topic = topic;
this.leader = leader;
this.epoch = epoch;
@@ -28,10 +28,10 @@
}
/**
- * The leader for this topic.
+ * The nodeId of leader for this topic.
* @return leader node.
*/
- public ControllerNode leader() {
+ public NodeId leader() {
return leader;
}
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
index bb89572..2709b7f 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
@@ -29,9 +29,9 @@
/**
* Gets the most recent leader for the topic.
* @param path topic
- * @return node who is the leader, null if so such topic exists.
+ * @return nodeId of the leader, null if so such topic exists.
*/
- ControllerNode getLeader(String path);
+ NodeId getLeader(String path);
/**
* Joins the leadership contest.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java
index 928ebb1..801b0b3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java
@@ -31,7 +31,6 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
-import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.Leadership;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
@@ -100,11 +99,11 @@
private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
listenerRegistry;
private final Map<String, Topic> topics = Maps.newConcurrentMap();
- private ControllerNode localNode;
+ private NodeId localNodeId;
@Activate
protected void activate() {
- localNode = clusterService.getLocalNode();
+ localNodeId = clusterService.getLocalNode().id();
listenerRegistry = new AbstractListenerRegistry<>();
eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
@@ -124,7 +123,7 @@
}
@Override
- public ControllerNode getLeader(String path) {
+ public NodeId getLeader(String path) {
Topic topic = topics.get(path);
if (topic == null) {
return null;
@@ -177,7 +176,7 @@
private volatile long lastLeadershipUpdateMs = 0;
private ExecutorService leaderElectionExecutor;
- private ControllerNode leader;
+ private NodeId leader;
private Lock leaderLock;
private Future<?> getLockFuture;
private Future<?> periodicProcessingFuture;
@@ -198,7 +197,7 @@
*
* @return the leader for the topic
*/
- private ControllerNode leader() {
+ private NodeId leader() {
return leader;
}
@@ -254,7 +253,7 @@
public void onMessage(Message<byte[]> message) {
LeadershipEvent leadershipEvent =
SERIALIZER.decode(message.getMessageObject());
- NodeId eventLeaderId = leadershipEvent.subject().leader().id();
+ NodeId eventLeaderId = leadershipEvent.subject().leader();
log.debug("Leadership Event: time = {} type = {} event = {}",
leadershipEvent.time(), leadershipEvent.type(),
@@ -262,7 +261,7 @@
if (!leadershipEvent.subject().topic().equals(topicName)) {
return; // Not our topic: ignore
}
- if (eventLeaderId.equals(localNode.id())) {
+ if (eventLeaderId.equals(localNodeId)) {
return; // My own message: ignore
}
@@ -276,7 +275,7 @@
// leadership and run for re-election.
//
if ((leader != null) &&
- leader.id().equals(localNode.id())) {
+ leader.equals(localNodeId)) {
getLockFuture.cancel(true);
} else {
// Just update the current leader
@@ -288,7 +287,7 @@
case LEADER_BOOTED:
// Remove the state for the current leader
if ((leader != null) &&
- eventLeaderId.equals(leader.id())) {
+ eventLeaderId.equals(leader)) {
leader = null;
}
eventDispatcher.post(leadershipEvent);
@@ -312,13 +311,13 @@
synchronized (this) {
LeadershipEvent leadershipEvent;
if (leader != null) {
- if (leader.id().equals(localNode.id())) {
+ if (leader.equals(localNodeId)) {
//
// Advertise ourselves as the leader
//
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
- new Leadership(topicName, localNode, 0));
+ new Leadership(topicName, localNodeId, 0));
// Dispatch to all remote instances
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
} else {
@@ -379,10 +378,10 @@
// This instance is now the leader
//
log.info("Leader Elected for topic {}", topicName);
- leader = localNode;
+ leader = localNodeId;
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(topicName, localNode, 0));
+ new Leadership(topicName, localNodeId, 0));
eventDispatcher.post(leadershipEvent);
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
}
@@ -403,12 +402,12 @@
// If we reach here, we should release the leadership
log.debug("Leader Lock Released for topic {}", topicName);
if ((leader != null) &&
- leader.id().equals(localNode.id())) {
+ leader.equals(localNodeId)) {
leader = null;
}
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(topicName, localNode, 0));
+ new Leadership(topicName, localNodeId, 0));
eventDispatcher.post(leadershipEvent);
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
leaderLock.unlock();
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
index e490c48..ee447fd 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -11,7 +11,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -19,11 +18,11 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
-import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.Leadership;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
@@ -36,6 +35,7 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -77,7 +77,7 @@
private final Map<String, Lock> openContests = Maps.newConcurrentMap();
private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
- private ControllerNode localNode;
+ private NodeId localNodeId;
private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
@@ -94,7 +94,7 @@
@Activate
public void activate() {
- localNode = clusterService.getLocalNode();
+ localNodeId = clusterService.getLocalNode().id();
addListener(peerAdvertiser);
addListener(leaderBoardUpdater);
@@ -120,7 +120,7 @@
@Override
- public ControllerNode getLeader(String path) {
+ public NodeId getLeader(String path) {
synchronized (leaderBoard) {
Leadership leadership = leaderBoard.get(path);
if (leadership != null) {
@@ -155,7 +155,7 @@
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(lock.path(), localNode, lock.epoch())));
+ new Leadership(lock.path(), localNodeId, lock.epoch())));
}
}
@@ -201,7 +201,7 @@
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(lock.path(), localNode, lock.epoch())));
+ new Leadership(lock.path(), localNodeId, lock.epoch())));
return;
} else {
log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
@@ -236,7 +236,7 @@
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
- new Leadership(lock.path(), localNode, lock.epoch())));
+ new Leadership(lock.path(), localNodeId, lock.epoch())));
threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
} else {
// Check if this node already withdrew from the contest, in which case
@@ -245,7 +245,7 @@
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(lock.path(), localNode, lock.epoch())));
+ new Leadership(lock.path(), localNodeId, lock.epoch())));
// Retry leadership after a brief wait.
threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
}
@@ -270,11 +270,11 @@
@Override
public void event(LeadershipEvent event) {
// publish events originating on this host.
- if (event.subject().leader().equals(localNode)) {
+ if (event.subject().leader().equals(localNodeId)) {
try {
clusterCommunicator.broadcast(
new ClusterMessage(
- localNode.id(),
+ localNodeId,
LEADERSHIP_UPDATES,
SERIALIZER.encode(event)));
} catch (IOException e) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
index 9d9b833..70bf95b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/intent/impl/HazelcastIntentBatchQueue.java
@@ -15,12 +15,12 @@
*/
package org.onlab.onos.store.intent.impl;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IQueue;
-import com.hazelcast.core.ItemEvent;
-import com.hazelcast.core.ItemListener;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -29,10 +29,10 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
-import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.event.AbstractListenerRegistry;
@@ -50,12 +50,12 @@
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
-import java.util.Map;
-import java.util.Set;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.slf4j.LoggerFactory.getLogger;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IQueue;
+import com.hazelcast.core.ItemEvent;
+import com.hazelcast.core.ItemListener;
@Component(immediate = true)
@Service
@@ -82,7 +82,7 @@
private HazelcastInstance theInstance;
- private ControllerNode localControllerNode;
+ private NodeId localControllerNodeId;
protected StoreSerializer serializer;
private IntentBatchDelegate delegate;
private InternalLeaderListener leaderListener = new InternalLeaderListener();
@@ -98,7 +98,7 @@
@Activate
public void activate() {
theInstance = storeService.getHazelcastInstance();
- localControllerNode = clusterService.getLocalNode();
+ localControllerNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leaderListener);
serializer = new KryoSerializer() {
@@ -254,7 +254,7 @@
if (!topic.startsWith(TOPIC_BASE)) {
return; // Not our topic: ignore
}
- if (!event.subject().leader().id().equals(localControllerNode.id())) {
+ if (!event.subject().leader().equals(localControllerNodeId)) {
// run for leadership
getQueue(getAppId(topic));
return; // The event is not about this instance: ignore
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLeadershipManager.java
index cbebcda..b10ac60 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLeadershipManager.java
@@ -10,12 +10,12 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
-import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.Leadership;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEvent.Type;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.cluster.NodeId;
/**
* A trivial implementation of the leadership service.
@@ -35,8 +35,8 @@
private Map<String, Boolean> elections = new ConcurrentHashMap<>();
@Override
- public ControllerNode getLeader(String path) {
- return elections.get(path) ? clusterService.getLocalNode() : null;
+ public NodeId getLeader(String path) {
+ return elections.get(path) ? clusterService.getLocalNode().id() : null;
}
@Override
@@ -44,7 +44,7 @@
elections.put(path, true);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
- new Leadership(path, clusterService.getLocalNode(), 0)));
+ new Leadership(path, clusterService.getLocalNode().id(), 0)));
}
}
@@ -53,7 +53,7 @@
elections.remove(path);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
- new Leadership(path, clusterService.getLocalNode(), 0)));
+ new Leadership(path, clusterService.getLocalNode().id(), 0)));
}
}