Fix balance-masters functionality in the new LeadershipService based device mastership store

Change-Id: I9f64d514cee7d5a5383fd4c2fa30a8616c97785c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 5606c60..d7d2bc0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -93,6 +93,8 @@
             new MessageSubject("mastership-store-device-role-query");
     private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
             new MessageSubject("mastership-store-device-role-relinquish");
+    private static final MessageSubject MASTERSHIP_RELINQUISH_SUBJECT =
+            new MessageSubject("mastership-store-device-mastership-relinquish");
 
     private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
             Pattern.compile("device:(.*)");
@@ -111,6 +113,7 @@
                     .register(KryoNamespaces.API)
                     .register(MastershipRole.class)
                     .register(MastershipEvent.class)
+                    .register(MastershipEvent.Type.class)
                     .build();
         }
     };
@@ -125,6 +128,11 @@
         clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
                new RoleRelinquishHandler(),
                messageHandlingExecutor);
+        clusterCommunicator.addSubscriber(MASTERSHIP_RELINQUISH_SUBJECT,
+                SERIALIZER::decode,
+                this::relinquishMastership,
+                SERIALIZER::encode,
+                messageHandlingExecutor);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.addListener(leadershipEventListener);
 
@@ -135,6 +143,7 @@
     public void deactivate() {
         clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
         clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
+        clusterCommunicator.removeSubscriber(MASTERSHIP_RELINQUISH_SUBJECT);
         messageHandlingExecutor.shutdown();
         leadershipService.removeListener(leadershipEventListener);
 
@@ -237,7 +246,22 @@
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
+        NodeId currentMaster = getMaster(deviceId);
+        if (nodeId.equals(currentMaster)) {
+            return null;
+        } else {
+            String leadershipTopic = createDeviceMastershipTopic(deviceId);
+            List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
+            if (candidates.isEmpty()) {
+                return null;
+            }
+            if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
+                return relinquishMastership(deviceId);
+            } else {
+                log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
+            }
+        }
+        return null;
     }
 
     @Override
@@ -254,7 +278,13 @@
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
+        NodeId currentMaster = getMaster(deviceId);
+        if (!nodeId.equals(currentMaster)) {
+            return null;
+        }
+        // FIXME: This can becomes the master again unless it
+        // is demoted to the end of candidates list.
+        return relinquishMastership(deviceId);
     }
 
     @Override
@@ -294,6 +324,37 @@
         return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
     }
 
+    private MastershipEvent relinquishMastership(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        NodeId currentMaster = getMaster(deviceId);
+        if (currentMaster == null) {
+            return null;
+        }
+
+        if (!currentMaster.equals(localNodeId)) {
+            log.info("Forwarding request to relinquish "
+                    + "mastership for device {} to {}", deviceId, currentMaster);
+            return futureGetOrElse(clusterCommunicator.sendAndReceive(
+                    deviceId,
+                    MASTERSHIP_RELINQUISH_SUBJECT,
+                    SERIALIZER::encode,
+                    SERIALIZER::decode,
+                    currentMaster), null);
+        }
+
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
+
+        MastershipEvent.Type eventType = null;
+        if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
+            eventType = MastershipEvent.Type.MASTER_CHANGED;
+        }
+
+        return leadershipService.stepdown(leadershipTopic)
+                ? new MastershipEvent(eventType, deviceId, getNodes(deviceId)) : null;
+    }
+
     private class RoleQueryHandler implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {