Add atomic mastership/term/backups method to MastershipService

Change-Id: I18c3aeaa5101c9ce08ff38fffd70eaec903a0f3e
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
index 3d1c376..4176130 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.incubator.store.virtual.impl;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -34,6 +35,7 @@
 import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipStoreDelegate;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.net.DeviceId;
@@ -45,7 +47,7 @@
 import org.slf4j.Logger;
 
 import java.util.List;
-import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -64,8 +66,6 @@
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import static com.google.common.base.Preconditions.checkArgument;
 
 @Component(immediate = true, enabled = false)
@@ -188,30 +188,16 @@
     public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
         checkArgument(networkId != null, NETWORK_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
+        Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+        return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
+    }
 
-        Map<NodeId, MastershipRole> roles = Maps.newHashMap();
-        clusterService.getNodes()
-                .forEach((node) -> roles.put(node.id(),
-                                             getRole(networkId, node.id(), deviceId)));
-
-        NodeId master = null;
-        final List<NodeId> standbys = Lists.newLinkedList();
-
-        List<NodeId> candidates = leadershipService
-                .getCandidates(createDeviceMastershipTopic(networkId, deviceId));
-
-        for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
-            if (entry.getValue() == MastershipRole.MASTER) {
-                master = entry.getKey();
-            } else if (entry.getValue() == MastershipRole.STANDBY) {
-                standbys.add(entry.getKey());
-            }
-        }
-
-        List<NodeId> sortedStandbyList = candidates.stream()
-                .filter(standbys::contains).collect(Collectors.toList());
-
-        return new RoleInfo(master, sortedStandbyList);
+    @Override
+    public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+        Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+        return buildMastershipFromLeadership(leadership);
     }
 
     @Override
@@ -322,9 +308,8 @@
         MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
                 MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
         leadershipService.withdraw(leadershipTopic);
-        return CompletableFuture.completedFuture(new MastershipEvent(eventType,
-                                                                     deviceId,
-                                                                     getNodes(networkId, deviceId)));
+        return CompletableFuture.completedFuture(
+            new MastershipEvent(eventType, deviceId, getMastership(networkId, deviceId)));
     }
 
     private CompletableFuture<MastershipEvent>
@@ -338,6 +323,24 @@
         // Noop. LeadershipService already takes care of detecting and purging stale locks.
     }
 
+    private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+        ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+        if (leadership.leaderNodeId() != null) {
+            builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+        }
+        leadership.candidates().forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+        clusterService.getNodes().stream()
+            .filter(node -> !leadership.candidates().contains(node.id()))
+            .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+        return new MastershipInfo(
+            leadership.leader() != null ? leadership.leader().term() : 0,
+            leadership.leader() != null
+                ? Optional.of(leadership.leader().nodeId())
+                : Optional.empty(),
+            builder.build());
+    }
+
     private class InternalDeviceMastershipEventListener
             implements LeadershipEventListener {
 
@@ -357,28 +360,23 @@
 
             NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-
-            RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
-                    getNodes(networkId, deviceId) : new RoleInfo();
+            MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+                ? buildMastershipFromLeadership(event.subject())
+                : new MastershipInfo();
 
             switch (event.type()) {
                 case LEADER_AND_CANDIDATES_CHANGED:
-                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
-                                                                  deviceId, roleInfo));
-                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
                     break;
                 case LEADER_CHANGED:
-                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
                     break;
                 case CANDIDATES_CHANGED:
-                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
                     break;
                 case SERVICE_DISRUPTED:
-                    notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
                     break;
                 case SERVICE_RESTORED:
                     // Do nothing, wait for updates from peers