Add term information in HazelcastLeadershipService

Implementation based on Hazelcast IAtomicLong.

Change-Id: I9dca40228a84fdb3edf02ffd2cc32d7d38c90378
diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
index a6e44a7..0800942 100644
--- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
@@ -29,15 +29,16 @@
         description = "Finds the leader for particular topic.")
 public class LeaderCommand extends AbstractShellCommand {
 
-    private static final String FMT = "%-20s: %15s";
+    private static final String FMT = "%-20s: %15s %5s";
 
     @Override
     protected void execute() {
         LeadershipService leaderService = get(LeadershipService.class);
         Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
-        print(FMT, "Topic", "Leader");
+        print(FMT, "Topic", "Leader", "Epoch");
         for (String topic : leaderBoard.keySet()) {
-            print(FMT, topic, leaderBoard.get(topic).leader());
+            Leadership leadership = leaderBoard.get(topic);
+            print(FMT, topic, leadership.leader(), leadership.epoch());
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 40e060b..92b8dce 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -15,22 +15,19 @@
  */
 package org.onosproject.store.cluster.impl;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.namedThreads;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
-
+import com.google.common.collect.Maps;
+import com.hazelcast.config.TopicConfig;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.LeadershipEvent;
@@ -42,15 +39,18 @@
 import org.onosproject.store.hz.StoreService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
-import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-import com.hazelcast.config.TopicConfig;
-import com.hazelcast.core.ITopic;
-import com.hazelcast.core.Message;
-import com.hazelcast.core.MessageListener;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.onlab.util.Tools.namedThreads;
 
 /**
  * Distributed implementation of LeadershipService that is based on Hazelcast.
@@ -90,6 +90,9 @@
     private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000;  // 15s
     private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
 
+    // indicates there is no term value yet
+    private static final long NO_TERM = 0;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
@@ -175,7 +178,7 @@
         for (Topic topic : topics.values()) {
             Leadership leadership = new Leadership(topic.topicName(),
                                                    topic.leader(),
-                                                   0L);
+                                                   topic.term());
             result.put(topic.topicName(), leadership);
         }
         return result;
@@ -229,6 +232,12 @@
         private volatile long lastLeadershipUpdateMs = 0;
         private ExecutorService leaderElectionExecutor;
 
+        private volatile IAtomicLong term;
+        // This is local state, recording the term number for the last time
+        // this instance was leader for this topic. The current term could be
+        // higher if the mastership has changed any times.
+        private long myLastLeaderTerm = NO_TERM;
+
         private NodeId leader;
         private Lock leaderLock;
         private Future<?> getLockFuture;
@@ -262,6 +271,18 @@
         }
 
         /**
+         * Gets the current term for the topic.
+         *
+         * @return the term for the topic
+         */
+        private long term() {
+            if (term == null) {
+                return NO_TERM;
+            }
+            return term.get();
+        }
+
+        /**
          * Starts operation.
          */
         private void start() {
@@ -290,7 +311,10 @@
                 start();
             }
             String lockHzId = "LeadershipService/" + topicName + "/lock";
+            String termHzId = "LeadershipService/" + topicName + "/term";
             leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
+            term = storeService.getHazelcastInstance().getAtomicLong(termHzId);
+
             getLockFuture = leaderElectionExecutor.submit(new Runnable() {
                     @Override
                     public void run() {
@@ -374,7 +398,7 @@
                             //
                             leadershipEvent = new LeadershipEvent(
                                 LeadershipEvent.Type.LEADER_REELECTED,
-                                new Leadership(topicName, localNodeId, 0));
+                                new Leadership(topicName, localNodeId, myLastLeaderTerm));
                             // Dispatch to all instances
                             leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
                         } else {
@@ -386,7 +410,7 @@
                             if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
                                 leadershipEvent = new LeadershipEvent(
                                         LeadershipEvent.Type.LEADER_BOOTED,
-                                        new Leadership(topicName, leader, 0));
+                                        new Leadership(topicName, leader, myLastLeaderTerm));
                                 // Dispatch only to the local listener(s)
                                 eventDispatcher.post(leadershipEvent);
                                 leader = null;
@@ -435,10 +459,13 @@
                     // This instance is now the leader
                     //
                     log.info("Leader Elected for topic {}", topicName);
+
+                    updateTerm();
+
                     leader = localNodeId;
                     leadershipEvent = new LeadershipEvent(
                         LeadershipEvent.Type.LEADER_ELECTED,
-                        new Leadership(topicName, localNodeId, 0));
+                        new Leadership(topicName, localNodeId, myLastLeaderTerm));
                     leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
                 }
 
@@ -463,11 +490,22 @@
                     }
                     leadershipEvent = new LeadershipEvent(
                                 LeadershipEvent.Type.LEADER_BOOTED,
-                                new Leadership(topicName, localNodeId, 0));
+                                new Leadership(topicName, localNodeId, myLastLeaderTerm));
                     leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
                     leaderLock.unlock();
                 }
             }
+
+        }
+
+        // Globally guarded by the leadership lock for this term
+        // Locally guarded by synchronized (this)
+        private void updateTerm() {
+            long oldTerm = term.get();
+            long newTerm = term.incrementAndGet();
+            myLastLeaderTerm = newTerm;
+            log.debug("Topic {} updated term from {} to {}", topicName,
+                      oldTerm, newTerm);
         }
     }
 }