Updated the HazelcastLeadershipService implemenation:
* Every listener receives all leadership events, even for new topics and
topics the local instance is not running for a leadership election
* Now getLeaderBoard() returns all leadership info
Change-Id: Ia11a10ed287d2f8d905dd987beb8052c35be6cf1
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java
index 8e2a7ee..b3cd370 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java
@@ -71,7 +71,8 @@
*/
@Component(immediate = true)
@Service
-public class HazelcastLeadershipService implements LeadershipService {
+public class HazelcastLeadershipService implements LeadershipService,
+ MessageListener<byte[]> {
private static final Logger log =
LoggerFactory.getLogger(HazelcastLeadershipService.class);
@@ -87,6 +88,7 @@
private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
+ private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -102,18 +104,29 @@
private final Map<String, Topic> topics = Maps.newConcurrentMap();
private NodeId localNodeId;
+ private ITopic<byte[]> leaderTopic;
+ private String leaderTopicRegistrationId;
+
@Activate
protected void activate() {
localNodeId = clusterService.getLocalNode().id();
listenerRegistry = new AbstractListenerRegistry<>();
eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setGlobalOrderingEnabled(true);
+ topicConfig.setName(TOPIC_HZ_ID);
+ storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
+ leaderTopic = storeService.getHazelcastInstance().getTopic(TOPIC_HZ_ID);
+ leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
+
log.info("Hazelcast Leadership Service started");
}
@Deactivate
protected void deactivate() {
eventDispatcher.removeSink(LeadershipEvent.class);
+ leaderTopic.removeMessageListener(leaderTopicRegistrationId);
for (Topic topic : topics.values()) {
topic.stop();
@@ -139,6 +152,9 @@
Topic oldTopic = topics.putIfAbsent(path, topic);
if (oldTopic == null) {
topic.start();
+ topic.runForLeadership();
+ } else {
+ oldTopic.runForLeadership();
}
}
@@ -156,11 +172,6 @@
public Map<String, Leadership> getLeaderBoard() {
Map<String, Leadership> result = new HashMap<>();
- //
- // Get the leaders for the topics.
- // NOTE: A topic is listed only if this instance is running for
- // a leadership for that topic.
- //
for (Topic topic : topics.values()) {
Leadership leadership = new Leadership(topic.topicName(),
topic.leader(),
@@ -180,12 +191,41 @@
listenerRegistry.removeListener(listener);
}
+ @Override
+ public void onMessage(Message<byte[]> message) {
+ LeadershipEvent leadershipEvent =
+ SERIALIZER.decode(message.getMessageObject());
+
+ log.debug("Leadership Event: time = {} type = {} event = {}",
+ leadershipEvent.time(), leadershipEvent.type(),
+ leadershipEvent);
+
+ //
+ // If there is no entry for the topic, then create a new one to
+ // keep track of the leadership, but don't run for leadership itself.
+ //
+ String topicName = leadershipEvent.subject().topic();
+ Topic topic = topics.get(topicName);
+ if (topic == null) {
+ topic = new Topic(topicName);
+ Topic oldTopic = topics.putIfAbsent(topicName, topic);
+ if (oldTopic == null) {
+ topic.start();
+ } else {
+ topic = oldTopic;
+ }
+ }
+ topic.receivedLeadershipEvent(leadershipEvent);
+ eventDispatcher.post(leadershipEvent);
+ }
+
/**
* Class for keeping per-topic information.
*/
- private final class Topic implements MessageListener<byte[]> {
+ private final class Topic {
private final String topicName;
private volatile boolean isShutdown = true;
+ private volatile boolean isRunningForLeadership = false;
private volatile long lastLeadershipUpdateMs = 0;
private ExecutorService leaderElectionExecutor;
@@ -193,8 +233,6 @@
private Lock leaderLock;
private Future<?> getLockFuture;
private Future<?> periodicProcessingFuture;
- private ITopic<byte[]> leaderTopic;
- private String leaderTopicRegistrationId;
/**
* Constructor.
@@ -224,33 +262,14 @@
}
/**
- * Starts leadership election for the topic.
+ * Starts operation.
*/
private void start() {
isShutdown = false;
- String lockHzId = "LeadershipService/" + topicName + "/lock";
- String topicHzId = "LeadershipService/" + topicName + "/topic";
- leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
-
String threadPoolName = "leader-election-" + topicName + "-%d";
leaderElectionExecutor = Executors.newScheduledThreadPool(2,
namedThreads(threadPoolName));
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setGlobalOrderingEnabled(true);
- topicConfig.setName(topicHzId);
- storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
-
- leaderTopic =
- storeService.getHazelcastInstance().getTopic(topicHzId);
- leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
-
- getLockFuture = leaderElectionExecutor.submit(new Runnable() {
- @Override
- public void run() {
- doLeaderElectionThread();
- }
- });
periodicProcessingFuture =
leaderElectionExecutor.submit(new Runnable() {
@Override
@@ -261,25 +280,43 @@
}
/**
+ * Runs for leadership.
+ */
+ private void runForLeadership() {
+ if (isRunningForLeadership) {
+ return; // Nothing to do: already running
+ }
+ if (isShutdown) {
+ start();
+ }
+ String lockHzId = "LeadershipService/" + topicName + "/lock";
+ leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
+ getLockFuture = leaderElectionExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ doLeaderElectionThread();
+ }
+ });
+ }
+
+ /**
* Stops leadership election for the topic.
*/
private void stop() {
isShutdown = true;
- leaderTopic.removeMessageListener(leaderTopicRegistrationId);
+ isRunningForLeadership = false;
// getLockFuture.cancel(true);
// periodicProcessingFuture.cancel(true);
leaderElectionExecutor.shutdownNow();
}
- @Override
- public void onMessage(Message<byte[]> message) {
- LeadershipEvent leadershipEvent =
- SERIALIZER.decode(message.getMessageObject());
+ /**
+ * Received a Leadership Event.
+ *
+ * @param leadershipEvent the received Leadership Event
+ */
+ private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
NodeId eventLeaderId = leadershipEvent.subject().leader();
-
- log.debug("Leadership Event: time = {} type = {} event = {}",
- leadershipEvent.time(), leadershipEvent.type(),
- leadershipEvent);
if (!leadershipEvent.subject().topic().equals(topicName)) {
return; // Not our topic: ignore
}
@@ -296,23 +333,21 @@
// Another leader: if we are also a leader, then give up
// leadership and run for re-election.
//
- if ((leader != null) &&
- leader.equals(localNodeId)) {
- getLockFuture.cancel(true);
+ if ((leader != null) && leader.equals(localNodeId)) {
+ if (getLockFuture != null) {
+ getLockFuture.cancel(true);
+ }
} else {
// Just update the current leader
leader = leadershipEvent.subject().leader();
lastLeadershipUpdateMs = System.currentTimeMillis();
}
- eventDispatcher.post(leadershipEvent);
break;
case LEADER_BOOTED:
// Remove the state for the current leader
- if ((leader != null) &&
- eventLeaderId.equals(leader)) {
+ if ((leader != null) && eventLeaderId.equals(leader)) {
leader = null;
}
- eventDispatcher.post(leadershipEvent);
break;
default:
break;
@@ -340,7 +375,7 @@
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
new Leadership(topicName, localNodeId, 0));
- // Dispatch to all remote instances
+ // Dispatch to all instances
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
} else {
//
@@ -404,7 +439,6 @@
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(topicName, localNodeId, 0));
- eventDispatcher.post(leadershipEvent);
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
}
@@ -430,7 +464,6 @@
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, localNodeId, 0));
- eventDispatcher.post(leadershipEvent);
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
leaderLock.unlock();
}