Add atomic mastership/term/backups method to MastershipService

Change-Id: I18c3aeaa5101c9ce08ff38fffd70eaec903a0f3e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java b/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
index 634582a..feb517c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
@@ -29,21 +29,33 @@
  */
 public final class ReplicaInfo {
 
+    private final long term;
     private final Optional<NodeId> master;
     private final List<NodeId> backups;
 
     /**
      * Creates a ReplicaInfo instance.
      *
+     * @param term monotonically increasing unique mastership term
      * @param master NodeId of the node where the master copy should be
      * @param backups list of NodeId, where backup copies should be placed
      */
-    public ReplicaInfo(NodeId master, List<NodeId> backups) {
+    public ReplicaInfo(long term, NodeId master, List<NodeId> backups) {
+        this.term = term;
         this.master = Optional.ofNullable(master);
         this.backups = checkNotNull(backups);
     }
 
     /**
+     * Returns the mastership term.
+     *
+     * @return the mastership term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
      * Returns the NodeId, if there is a Node where the master copy should be.
      *
      * @return NodeId, where the master copy should be placed
@@ -78,6 +90,7 @@
 
     // for Serializer
     private ReplicaInfo() {
+        this.term = 0;
         this.master = Optional.empty();
         this.backups = Collections.emptyList();
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
index 79c9147..cfa998d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
@@ -15,18 +15,16 @@
  */
 package org.onosproject.store.flow.impl;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.event.ListenerRegistry;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
@@ -36,8 +34,6 @@
 import org.onosproject.store.flow.ReplicaInfoService;
 import org.slf4j.Logger;
 
-import java.util.Collections;
-import java.util.List;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
@@ -79,7 +75,7 @@
 
     @Override
     public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
-        return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
+        return buildFromRoleInfo(mastershipService.getMastershipFor(deviceId));
     }
 
     @Override
@@ -92,17 +88,15 @@
         listenerRegistry.removeListener(checkNotNull(listener));
     }
 
-    private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
-        List<NodeId> backups = roles.backups() == null ?
-            Collections.emptyList() : ImmutableList.copyOf(roles.backups());
-        return new ReplicaInfo(roles.master(), backups);
+    private static ReplicaInfo buildFromRoleInfo(MastershipInfo mastership) {
+        return new ReplicaInfo(mastership.term(), mastership.master().orElse(null), mastership.backups());
     }
 
     final class InternalMastershipListener implements MastershipListener {
 
         @Override
         public void event(MastershipEvent event) {
-            final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
+            final ReplicaInfo replicaInfo = buildFromRoleInfo(event.mastershipInfo());
             switch (event.type()) {
                 case MASTER_CHANGED:
                     eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 1b406e7..826665d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -23,7 +23,8 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -34,6 +35,7 @@
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -50,6 +52,7 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.RoleInfo;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipStore;
 import org.onosproject.mastership.MastershipStoreDelegate;
 import org.onosproject.mastership.MastershipTerm;
@@ -62,10 +65,7 @@
 import org.onosproject.store.service.Serializer;
 import org.slf4j.Logger;
 
-import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * Implementation of the MastershipStore on top of Leadership Service.
@@ -158,7 +158,7 @@
         NodeId leader = leadership == null ? null : leadership.leaderNodeId();
         List<NodeId> candidates = leadership == null ?
                 ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
-        MastershipRole role = Objects.equal(localNodeId, leader) ?
+        MastershipRole role = Objects.equals(localNodeId, leader) ?
                 MastershipRole.MASTER : candidates.contains(localNodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
         return CompletableFuture.completedFuture(role);
     }
@@ -173,7 +173,7 @@
         NodeId leader = leadership == null ? null : leadership.leaderNodeId();
         List<NodeId> candidates = leadership == null ?
                 ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
-        return Objects.equal(nodeId, leader) ?
+        return Objects.equals(nodeId, leader) ?
                 MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
     }
 
@@ -187,27 +187,15 @@
     @Override
     public RoleInfo getNodes(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
+        Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(deviceId));
+        return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
+    }
 
-        Map<NodeId, MastershipRole> roles = Maps.newHashMap();
-        clusterService.getNodes()
-                      .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
-
-        NodeId master = null;
-        final List<NodeId> standbys = Lists.newLinkedList();
-
-        List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
-
-        for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
-            if (entry.getValue() == MastershipRole.MASTER) {
-                master = entry.getKey();
-            } else if (entry.getValue() == MastershipRole.STANDBY) {
-                standbys.add(entry.getKey());
-            }
-        }
-
-        List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
-
-        return new RoleInfo(master, sortedStandbyList);
+    @Override
+    public MastershipInfo getMastership(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+        Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(deviceId));
+        return buildMastershipFromLeadership(leadership);
     }
 
     @Override
@@ -263,7 +251,7 @@
         List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
 
         NodeId newMaster = candidates.stream()
-                                     .filter(candidate -> !Objects.equal(nodeId, candidate))
+                                     .filter(candidate -> !Objects.equals(nodeId, candidate))
                                      .findFirst()
                                      .orElse(null);
         log.info("Transitioning to role {} for {}. Next master: {}",
@@ -304,7 +292,7 @@
         MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
                 MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
         leadershipService.withdraw(leadershipTopic);
-        return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
+        return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getMastership(deviceId)));
     }
 
     @Override
@@ -312,6 +300,27 @@
         // Noop. LeadershipService already takes care of detecting and purging stale locks.
     }
 
+    private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+        ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+        if (leadership.leaderNodeId() != null) {
+            builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+        }
+        leadership.candidates().stream()
+            .filter(nodeId -> !Objects.equals(leadership.leaderNodeId(), nodeId))
+            .forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+        clusterService.getNodes().stream()
+            .filter(node -> !Objects.equals(leadership.leaderNodeId(), node.id()))
+            .filter(node -> !leadership.candidates().contains(node.id()))
+            .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+        return new MastershipInfo(
+            leadership.leader() != null ? leadership.leader().term() : 0,
+            leadership.leader() != null
+                ? Optional.of(leadership.leader().nodeId())
+                : Optional.empty(),
+            builder.build());
+    }
+
     private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
 
         @Override
@@ -328,27 +337,23 @@
         private void handleEvent(LeadershipEvent event) {
             Leadership leadership = event.subject();
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-            NodeId master = event.subject().leaderNodeId();
-            List<NodeId> backups = event.subject().candidates()
-                    .stream()
-                    .filter(n -> !n.equals(master))
-                    .collect(Collectors.toList());
-            RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
-                    ? new RoleInfo(master, backups)
-                    : new RoleInfo();
+            MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+                ? buildMastershipFromLeadership(event.subject())
+                : new MastershipInfo();
+
             switch (event.type()) {
                 case LEADER_AND_CANDIDATES_CHANGED:
-                    notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
-                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
+                    notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
                     break;
                 case LEADER_CHANGED:
-                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
+                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
                     break;
                 case CANDIDATES_CHANGED:
-                    notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
+                    notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
                     break;
                 case SERVICE_DISRUPTED:
-                    notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
+                    notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
                     break;
                 case SERVICE_RESTORED:
                     // Do nothing, wait for updates from peers
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index 9709643..992b77b 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -15,32 +15,33 @@
  */
 package org.onosproject.store.flow.impl;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
 import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.event.ListenerRegistry;
 import org.onosproject.mastership.MastershipEvent;
 import org.onosproject.mastership.MastershipEvent.Type;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
 import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEvent;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -101,7 +102,7 @@
 
         // fake MastershipEvent
         eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
-            new RoleInfo(NID1, new LinkedList<>())));
+            new MastershipInfo(1, Optional.of(NID1), ImmutableMap.of(NID1, MastershipRole.MASTER))));
 
         assertTrue(latch.await(1, TimeUnit.SECONDS));
     }
@@ -149,8 +150,11 @@
         }
 
         @Override
-        public RoleInfo getNodesFor(DeviceId deviceId) {
-            return new RoleInfo(masters.get(deviceId), Collections.emptyList());
+        public MastershipInfo getMastershipFor(DeviceId deviceId) {
+            return new MastershipInfo(
+                1,
+                Optional.ofNullable(masters.get(deviceId)),
+                ImmutableMap.of(NID1, MastershipRole.MASTER));
         }
 
         @Override