Add term information in HazelcastLeadershipService
Implementation based on Hazelcast IAtomicLong.
Change-Id: I9dca40228a84fdb3edf02ffd2cc32d7d38c90378
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 40e060b..92b8dce 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -15,22 +15,19 @@
*/
package org.onosproject.store.cluster.impl;
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.namedThreads;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
-
+import com.google.common.collect.Maps;
+import com.hazelcast.config.TopicConfig;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
@@ -42,15 +39,18 @@
import org.onosproject.store.hz.StoreService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
-import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-import com.hazelcast.config.TopicConfig;
-import com.hazelcast.core.ITopic;
-import com.hazelcast.core.Message;
-import com.hazelcast.core.MessageListener;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.onlab.util.Tools.namedThreads;
/**
* Distributed implementation of LeadershipService that is based on Hazelcast.
@@ -90,6 +90,9 @@
private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
+ // indicates there is no term value yet
+ private static final long NO_TERM = 0;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -175,7 +178,7 @@
for (Topic topic : topics.values()) {
Leadership leadership = new Leadership(topic.topicName(),
topic.leader(),
- 0L);
+ topic.term());
result.put(topic.topicName(), leadership);
}
return result;
@@ -229,6 +232,12 @@
private volatile long lastLeadershipUpdateMs = 0;
private ExecutorService leaderElectionExecutor;
+ private volatile IAtomicLong term;
+ // This is local state, recording the term number for the last time
+ // this instance was leader for this topic. The current term could be
+ // higher if the mastership has changed any times.
+ private long myLastLeaderTerm = NO_TERM;
+
private NodeId leader;
private Lock leaderLock;
private Future<?> getLockFuture;
@@ -262,6 +271,18 @@
}
/**
+ * Gets the current term for the topic.
+ *
+ * @return the term for the topic
+ */
+ private long term() {
+ if (term == null) {
+ return NO_TERM;
+ }
+ return term.get();
+ }
+
+ /**
* Starts operation.
*/
private void start() {
@@ -290,7 +311,10 @@
start();
}
String lockHzId = "LeadershipService/" + topicName + "/lock";
+ String termHzId = "LeadershipService/" + topicName + "/term";
leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
+ term = storeService.getHazelcastInstance().getAtomicLong(termHzId);
+
getLockFuture = leaderElectionExecutor.submit(new Runnable() {
@Override
public void run() {
@@ -374,7 +398,7 @@
//
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
- new Leadership(topicName, localNodeId, 0));
+ new Leadership(topicName, localNodeId, myLastLeaderTerm));
// Dispatch to all instances
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
} else {
@@ -386,7 +410,7 @@
if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(topicName, leader, 0));
+ new Leadership(topicName, leader, myLastLeaderTerm));
// Dispatch only to the local listener(s)
eventDispatcher.post(leadershipEvent);
leader = null;
@@ -435,10 +459,13 @@
// This instance is now the leader
//
log.info("Leader Elected for topic {}", topicName);
+
+ updateTerm();
+
leader = localNodeId;
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(topicName, localNodeId, 0));
+ new Leadership(topicName, localNodeId, myLastLeaderTerm));
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
}
@@ -463,11 +490,22 @@
}
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(topicName, localNodeId, 0));
+ new Leadership(topicName, localNodeId, myLastLeaderTerm));
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
leaderLock.unlock();
}
}
+
+ }
+
+ // Globally guarded by the leadership lock for this term
+ // Locally guarded by synchronized (this)
+ private void updateTerm() {
+ long oldTerm = term.get();
+ long newTerm = term.incrementAndGet();
+ myLastLeaderTerm = newTerm;
+ log.debug("Topic {} updated term from {} to {}", topicName,
+ oldTerm, newTerm);
}
}
}