Using ClusterCommunicationService instead of ITopic for notifying cluster members of leadership events.

Change-Id: I164f30da436f3e4f65c4e938c25bb2aa2faa16c3
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 990f259..df746f7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -18,9 +18,6 @@
 import com.google.common.collect.Maps;
 import com.hazelcast.config.TopicConfig;
 import com.hazelcast.core.IAtomicLong;
-import com.hazelcast.core.ITopic;
-import com.hazelcast.core.Message;
-import com.hazelcast.core.MessageListener;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -36,6 +33,10 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.AbstractListenerRegistry;
 import org.onosproject.event.EventDeliveryService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.hz.StoreService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
@@ -71,8 +72,7 @@
  */
 @Component(immediate = true)
 @Service
-public class HazelcastLeadershipService implements LeadershipService,
-                                        MessageListener<byte[]> {
+public class HazelcastLeadershipService implements LeadershipService {
     private static final Logger log =
         LoggerFactory.getLogger(HazelcastLeadershipService.class);
 
@@ -94,6 +94,9 @@
     private static final long NO_TERM = 0;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -107,8 +110,8 @@
     private final Map<String, Topic> topics = Maps.newConcurrentMap();
     private NodeId localNodeId;
 
-    private ITopic<byte[]> leaderTopic;
-    private String leaderTopicRegistrationId;
+    private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
+            new MessageSubject("hz-leadership-events");
 
     @Activate
     protected void activate() {
@@ -120,8 +123,8 @@
         topicConfig.setGlobalOrderingEnabled(true);
         topicConfig.setName(TOPIC_HZ_ID);
         storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
-        leaderTopic = storeService.getHazelcastInstance().getTopic(TOPIC_HZ_ID);
-        leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
+
+        clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener());
 
         log.info("Hazelcast Leadership Service started");
     }
@@ -129,7 +132,7 @@
     @Deactivate
     protected void deactivate() {
         eventDispatcher.removeSink(LeadershipEvent.class);
-        leaderTopic.removeMessageListener(leaderTopicRegistrationId);
+        clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
 
         for (Topic topic : topics.values()) {
             topic.stop();
@@ -194,35 +197,6 @@
         listenerRegistry.removeListener(listener);
     }
 
-    @Override
-    public void onMessage(Message<byte[]> message) {
-        LeadershipEvent leadershipEvent =
-            SERIALIZER.decode(message.getMessageObject());
-
-        log.trace("Leadership Event: time = {} type = {} event = {}",
-                  leadershipEvent.time(), leadershipEvent.type(),
-                  leadershipEvent);
-
-        //
-        // If there is no entry for the topic, then create a new one to
-        // keep track of the leadership, but don't run for leadership itself.
-        //
-        String topicName = leadershipEvent.subject().topic();
-        Topic topic = topics.get(topicName);
-        if (topic == null) {
-            topic = new Topic(topicName);
-            Topic oldTopic = topics.putIfAbsent(topicName, topic);
-            if (oldTopic == null) {
-                // encountered new topic, start periodic processing
-                topic.start();
-            } else {
-                topic = oldTopic;
-            }
-        }
-        topic.receivedLeadershipEvent(leadershipEvent);
-        eventDispatcher.post(leadershipEvent);
-    }
-
     /**
      * Class for keeping per-topic information.
      */
@@ -406,7 +380,12 @@
                                 LeadershipEvent.Type.LEADER_REELECTED,
                                 new Leadership(topicName, localNodeId, myLastLeaderTerm));
                             // Dispatch to all instances
-                            leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+
+                            clusterCommunicator.broadcastIncludeSelf(
+                                    new ClusterMessage(
+                                            clusterService.getLocalNode().id(),
+                                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                            SERIALIZER.encode(leadershipEvent)));
                         } else {
                             //
                             // Test if time to expire a stale leader
@@ -473,7 +452,11 @@
                         leadershipEvent = new LeadershipEvent(
                                                               LeadershipEvent.Type.LEADER_ELECTED,
                                                               new Leadership(topicName, localNodeId, myLastLeaderTerm));
-                        leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+                        clusterCommunicator.broadcastIncludeSelf(
+                                new ClusterMessage(
+                                        clusterService.getLocalNode().id(),
+                                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                        SERIALIZER.encode(leadershipEvent)));
                     }
 
                     // Sleep forever until interrupted
@@ -497,7 +480,11 @@
                         leadershipEvent = new LeadershipEvent(
                                                               LeadershipEvent.Type.LEADER_BOOTED,
                                                               new Leadership(topicName, localNodeId, myLastLeaderTerm));
-                        leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+                        clusterCommunicator.broadcastIncludeSelf(
+                                new ClusterMessage(
+                                        clusterService.getLocalNode().id(),
+                                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                        SERIALIZER.encode(leadershipEvent)));
                         leaderLock.unlock();
                     }
                 }
@@ -515,4 +502,35 @@
                       oldTerm, newTerm);
         }
     }
+
+    private class InternalLeadershipEventListener implements ClusterMessageHandler {
+
+        @Override
+        public void handle(ClusterMessage message) {
+            LeadershipEvent leadershipEvent =
+                    SERIALIZER.decode(message.payload());
+
+                log.trace("Leadership Event: time = {} type = {} event = {}",
+                          leadershipEvent.time(), leadershipEvent.type(),
+                          leadershipEvent);
+                //
+                // If there is no entry for the topic, then create a new one to
+                // keep track of the leadership, but don't run for leadership itself.
+                //
+                String topicName = leadershipEvent.subject().topic();
+                Topic topic = topics.get(topicName);
+                if (topic == null) {
+                    topic = new Topic(topicName);
+                    Topic oldTopic = topics.putIfAbsent(topicName, topic);
+                    if (oldTopic == null) {
+                        // encountered new topic, start periodic processing
+                        topic.start();
+                    } else {
+                        topic = oldTopic;
+                    }
+                }
+                topic.receivedLeadershipEvent(leadershipEvent);
+                eventDispatcher.post(leadershipEvent);
+        }
+    }
 }