Fix potential race conditions in HazelcastLeadershipService
Change-Id: Iac232652155830c8e054760ea371ffb5639cf464
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index ca23482..990f259 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -166,8 +166,8 @@
checkArgument(path != null);
Topic topic = topics.get(path);
if (topic != null) {
- topic.stop();
topics.remove(path, topic);
+ topic.stop();
}
}
@@ -213,6 +213,7 @@
topic = new Topic(topicName);
Topic oldTopic = topics.putIfAbsent(topicName, topic);
if (oldTopic == null) {
+ // encountered new topic, start periodic processing
topic.start();
} else {
topic = oldTopic;
@@ -285,7 +286,11 @@
/**
* Starts operation.
*/
- private void start() {
+ private synchronized void start() {
+ if (!isShutdown) {
+ // already running
+ return;
+ }
isShutdown = false;
String threadPoolName = "onos-leader-election-" + topicName + "-%d";
leaderElectionExecutor = Executors.newScheduledThreadPool(2,
@@ -303,13 +308,14 @@
/**
* Runs for leadership.
*/
- private void runForLeadership() {
+ private synchronized void runForLeadership() {
if (isRunningForLeadership) {
return; // Nothing to do: already running
}
if (isShutdown) {
start();
}
+ isRunningForLeadership = true;
String lockHzId = "LeadershipService/" + topicName + "/lock";
String termHzId = "LeadershipService/" + topicName + "/term";
leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
@@ -326,7 +332,7 @@
/**
* Stops leadership election for the topic.
*/
- private void stop() {
+ private synchronized void stop() {
isShutdown = true;
isRunningForLeadership = false;
// getLockFuture.cancel(true);
@@ -454,22 +460,22 @@
continue;
}
- synchronized (this) {
- //
- // This instance is now the leader
- //
- log.info("Leader Elected for topic {}", topicName);
-
- updateTerm();
-
- leader = localNodeId;
- leadershipEvent = new LeadershipEvent(
- LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(topicName, localNodeId, myLastLeaderTerm));
- leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
- }
-
try {
+ synchronized (this) {
+ //
+ // This instance is now the leader
+ //
+ log.info("Leader Elected for topic {}", topicName);
+
+ updateTerm();
+
+ leader = localNodeId;
+ leadershipEvent = new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_ELECTED,
+ new Leadership(topicName, localNodeId, myLastLeaderTerm));
+ leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+ }
+
// Sleep forever until interrupted
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
@@ -479,23 +485,24 @@
//
log.debug("Leader Interrupted for topic {}",
topicName);
- }
- synchronized (this) {
- // If we reach here, we should release the leadership
- log.debug("Leader Lock Released for topic {}", topicName);
- if ((leader != null) &&
- leader.equals(localNodeId)) {
- leader = null;
+ } finally {
+ synchronized (this) {
+ // If we reach here, we should release the leadership
+ log.debug("Leader Lock Released for topic {}", topicName);
+ if ((leader != null) &&
+ leader.equals(localNodeId)) {
+ leader = null;
+ }
+ leadershipEvent = new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_BOOTED,
+ new Leadership(topicName, localNodeId, myLastLeaderTerm));
+ leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+ leaderLock.unlock();
}
- leadershipEvent = new LeadershipEvent(
- LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(topicName, localNodeId, myLastLeaderTerm));
- leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
- leaderLock.unlock();
}
}
-
+ isRunningForLeadership = false;
}
// Globally guarded by the leadership lock for this term