Add atomic mastership/term/backups method to MastershipService

Change-Id: I18c3aeaa5101c9ce08ff38fffd70eaec903a0f3e
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
index 3d1c376..4176130 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.incubator.store.virtual.impl;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -34,6 +35,7 @@
 import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipStoreDelegate;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.net.DeviceId;
@@ -45,7 +47,7 @@
 import org.slf4j.Logger;
 
 import java.util.List;
-import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -64,8 +66,6 @@
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import static com.google.common.base.Preconditions.checkArgument;
 
 @Component(immediate = true, enabled = false)
@@ -188,30 +188,16 @@
     public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
         checkArgument(networkId != null, NETWORK_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
+        Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+        return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
+    }
 
-        Map<NodeId, MastershipRole> roles = Maps.newHashMap();
-        clusterService.getNodes()
-                .forEach((node) -> roles.put(node.id(),
-                                             getRole(networkId, node.id(), deviceId)));
-
-        NodeId master = null;
-        final List<NodeId> standbys = Lists.newLinkedList();
-
-        List<NodeId> candidates = leadershipService
-                .getCandidates(createDeviceMastershipTopic(networkId, deviceId));
-
-        for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
-            if (entry.getValue() == MastershipRole.MASTER) {
-                master = entry.getKey();
-            } else if (entry.getValue() == MastershipRole.STANDBY) {
-                standbys.add(entry.getKey());
-            }
-        }
-
-        List<NodeId> sortedStandbyList = candidates.stream()
-                .filter(standbys::contains).collect(Collectors.toList());
-
-        return new RoleInfo(master, sortedStandbyList);
+    @Override
+    public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+        Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+        return buildMastershipFromLeadership(leadership);
     }
 
     @Override
@@ -322,9 +308,8 @@
         MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
                 MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
         leadershipService.withdraw(leadershipTopic);
-        return CompletableFuture.completedFuture(new MastershipEvent(eventType,
-                                                                     deviceId,
-                                                                     getNodes(networkId, deviceId)));
+        return CompletableFuture.completedFuture(
+            new MastershipEvent(eventType, deviceId, getMastership(networkId, deviceId)));
     }
 
     private CompletableFuture<MastershipEvent>
@@ -338,6 +323,24 @@
         // Noop. LeadershipService already takes care of detecting and purging stale locks.
     }
 
+    private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+        ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+        if (leadership.leaderNodeId() != null) {
+            builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+        }
+        leadership.candidates().forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+        clusterService.getNodes().stream()
+            .filter(node -> !leadership.candidates().contains(node.id()))
+            .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+        return new MastershipInfo(
+            leadership.leader() != null ? leadership.leader().term() : 0,
+            leadership.leader() != null
+                ? Optional.of(leadership.leader().nodeId())
+                : Optional.empty(),
+            builder.build());
+    }
+
     private class InternalDeviceMastershipEventListener
             implements LeadershipEventListener {
 
@@ -357,28 +360,23 @@
 
             NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-
-            RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
-                    getNodes(networkId, deviceId) : new RoleInfo();
+            MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+                ? buildMastershipFromLeadership(event.subject())
+                : new MastershipInfo();
 
             switch (event.type()) {
                 case LEADER_AND_CANDIDATES_CHANGED:
-                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
-                                                                  deviceId, roleInfo));
-                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
                     break;
                 case LEADER_CHANGED:
-                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
                     break;
                 case CANDIDATES_CHANGED:
-                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
                     break;
                 case SERVICE_DISRUPTED:
-                    notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
-                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
                     break;
                 case SERVICE_RESTORED:
                     // Do nothing, wait for updates from peers
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
index 730ebee..346999b 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
@@ -17,6 +17,7 @@
 package org.onosproject.incubator.store.virtual.impl;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -36,6 +37,7 @@
 import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
 import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
 import org.onosproject.mastership.MastershipStoreDelegate;
 import org.onosproject.mastership.MastershipTerm;
 import org.onosproject.net.DeviceId;
@@ -50,6 +52,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -122,7 +125,7 @@
                     // remove from backup list
                     removeFromBackups(networkId, deviceId, node);
                     notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                       getNodes(networkId, deviceId)));
+                        getMastership(networkId, deviceId)));
                     return CompletableFuture.completedFuture(MastershipRole.MASTER);
                 }
                 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
@@ -132,13 +135,13 @@
                     masterMap.put(deviceId, node);
                     incrementTerm(networkId, deviceId);
                     notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                       getNodes(networkId, deviceId)));
+                        getMastership(networkId, deviceId)));
                     return CompletableFuture.completedFuture(MastershipRole.MASTER);
                 }
                 // add to backup list
                 if (addToBackup(networkId, deviceId, node)) {
                     notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId,
-                                                       getNodes(networkId, deviceId)));
+                        getMastership(networkId, deviceId)));
                 }
                 return CompletableFuture.completedFuture(MastershipRole.STANDBY);
             default:
@@ -184,6 +187,27 @@
     }
 
     @Override
+    public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+        Map<DeviceId, AtomicInteger> termMap = getTermMap(networkId);
+        Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
+        ImmutableMap.Builder<NodeId, MastershipRole> roleBuilder = ImmutableMap.builder();
+        NodeId master = masterMap.get(deviceId);
+        if (master != null) {
+            roleBuilder.put(master, MastershipRole.MASTER);
+        }
+        backups.getOrDefault(master, Collections.emptyList())
+            .forEach(nodeId -> roleBuilder.put(nodeId, MastershipRole.STANDBY));
+        clusterService.getNodes().stream()
+            .filter(node -> !masterMap.containsValue(node.id()))
+            .forEach(node -> roleBuilder.put(node.id(), MastershipRole.NONE));
+        return new MastershipInfo(
+            termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)).get(),
+            Optional.ofNullable(master),
+            roleBuilder.build());
+    }
+
+    @Override
     public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
         Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
 
@@ -219,7 +243,7 @@
         }
 
         return CompletableFuture.completedFuture(
-                new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(networkId, deviceId)));
+                new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(networkId, deviceId)));
     }
 
     @Override
@@ -249,14 +273,14 @@
                     // TODO: Should there be new event type for no MASTER?
                     return CompletableFuture.completedFuture(
                             new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                getNodes(networkId, deviceId)));
+                                getMastership(networkId, deviceId)));
                 } else {
                     NodeId prevMaster = masterMap.put(deviceId, backup);
                     incrementTerm(networkId, deviceId);
                     addToBackup(networkId, deviceId, prevMaster);
                     return CompletableFuture.completedFuture(
                             new MastershipEvent(MASTER_CHANGED, deviceId,
-                                                getNodes(networkId, deviceId)));
+                                getMastership(networkId, deviceId)));
                 }
 
             case STANDBY:
@@ -265,7 +289,7 @@
                 if (modified) {
                     return CompletableFuture.completedFuture(
                             new MastershipEvent(BACKUPS_CHANGED, deviceId,
-                                                getNodes(networkId, deviceId)));
+                                getMastership(networkId, deviceId)));
                 }
                 break;
 
@@ -314,13 +338,13 @@
                 incrementTerm(networkId, deviceId);
                 return CompletableFuture.completedFuture(
                         new MastershipEvent(MASTER_CHANGED, deviceId,
-                                            getNodes(networkId, deviceId)));
+                            getMastership(networkId, deviceId)));
 
             case STANDBY:
                 if (removeFromBackups(networkId, deviceId, nodeId)) {
                     return CompletableFuture.completedFuture(
                             new MastershipEvent(BACKUPS_CHANGED, deviceId,
-                                                getNodes(networkId, deviceId)));
+                                getMastership(networkId, deviceId)));
                 }
                 break;