Using ClusterCommunicationService instead of ITopic for notifying cluster members of leadership events.
Change-Id: I164f30da436f3e4f65c4e938c25bb2aa2faa16c3
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 990f259..df746f7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -18,9 +18,6 @@
import com.google.common.collect.Maps;
import com.hazelcast.config.TopicConfig;
import com.hazelcast.core.IAtomicLong;
-import com.hazelcast.core.ITopic;
-import com.hazelcast.core.Message;
-import com.hazelcast.core.MessageListener;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -36,6 +33,10 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.hz.StoreService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
@@ -71,8 +72,7 @@
*/
@Component(immediate = true)
@Service
-public class HazelcastLeadershipService implements LeadershipService,
- MessageListener<byte[]> {
+public class HazelcastLeadershipService implements LeadershipService {
private static final Logger log =
LoggerFactory.getLogger(HazelcastLeadershipService.class);
@@ -94,6 +94,9 @@
private static final long NO_TERM = 0;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -107,8 +110,8 @@
private final Map<String, Topic> topics = Maps.newConcurrentMap();
private NodeId localNodeId;
- private ITopic<byte[]> leaderTopic;
- private String leaderTopicRegistrationId;
+ private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
+ new MessageSubject("hz-leadership-events");
@Activate
protected void activate() {
@@ -120,8 +123,8 @@
topicConfig.setGlobalOrderingEnabled(true);
topicConfig.setName(TOPIC_HZ_ID);
storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
- leaderTopic = storeService.getHazelcastInstance().getTopic(TOPIC_HZ_ID);
- leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
+
+ clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener());
log.info("Hazelcast Leadership Service started");
}
@@ -129,7 +132,7 @@
@Deactivate
protected void deactivate() {
eventDispatcher.removeSink(LeadershipEvent.class);
- leaderTopic.removeMessageListener(leaderTopicRegistrationId);
+ clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
for (Topic topic : topics.values()) {
topic.stop();
@@ -194,35 +197,6 @@
listenerRegistry.removeListener(listener);
}
- @Override
- public void onMessage(Message<byte[]> message) {
- LeadershipEvent leadershipEvent =
- SERIALIZER.decode(message.getMessageObject());
-
- log.trace("Leadership Event: time = {} type = {} event = {}",
- leadershipEvent.time(), leadershipEvent.type(),
- leadershipEvent);
-
- //
- // If there is no entry for the topic, then create a new one to
- // keep track of the leadership, but don't run for leadership itself.
- //
- String topicName = leadershipEvent.subject().topic();
- Topic topic = topics.get(topicName);
- if (topic == null) {
- topic = new Topic(topicName);
- Topic oldTopic = topics.putIfAbsent(topicName, topic);
- if (oldTopic == null) {
- // encountered new topic, start periodic processing
- topic.start();
- } else {
- topic = oldTopic;
- }
- }
- topic.receivedLeadershipEvent(leadershipEvent);
- eventDispatcher.post(leadershipEvent);
- }
-
/**
* Class for keeping per-topic information.
*/
@@ -406,7 +380,12 @@
LeadershipEvent.Type.LEADER_REELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
// Dispatch to all instances
- leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+
+ clusterCommunicator.broadcastIncludeSelf(
+ new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER.encode(leadershipEvent)));
} else {
//
// Test if time to expire a stale leader
@@ -473,7 +452,11 @@
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
- leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+ clusterCommunicator.broadcastIncludeSelf(
+ new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER.encode(leadershipEvent)));
}
// Sleep forever until interrupted
@@ -497,7 +480,11 @@
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
- leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+ clusterCommunicator.broadcastIncludeSelf(
+ new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER.encode(leadershipEvent)));
leaderLock.unlock();
}
}
@@ -515,4 +502,35 @@
oldTerm, newTerm);
}
}
+
+ private class InternalLeadershipEventListener implements ClusterMessageHandler {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ LeadershipEvent leadershipEvent =
+ SERIALIZER.decode(message.payload());
+
+ log.trace("Leadership Event: time = {} type = {} event = {}",
+ leadershipEvent.time(), leadershipEvent.type(),
+ leadershipEvent);
+ //
+ // If there is no entry for the topic, then create a new one to
+ // keep track of the leadership, but don't run for leadership itself.
+ //
+ String topicName = leadershipEvent.subject().topic();
+ Topic topic = topics.get(topicName);
+ if (topic == null) {
+ topic = new Topic(topicName);
+ Topic oldTopic = topics.putIfAbsent(topicName, topic);
+ if (oldTopic == null) {
+ // encountered new topic, start periodic processing
+ topic.start();
+ } else {
+ topic = oldTopic;
+ }
+ }
+ topic.receivedLeadershipEvent(leadershipEvent);
+ eventDispatcher.post(leadershipEvent);
+ }
+ }
}