ConsistentDeviceMastership on top of LeadershipService, and leaders CLI command
modified to filter on topic. This does not support changing candidate ordering
(yet).

Refernce: ONOS-76

Change-Id: I028a6df0acbe3c4e4ff7c228f687f640e48e13be
diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
index b8cb945..a4b0d3a 100644
--- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
@@ -18,7 +18,9 @@
 import java.util.Comparator;
 import java.util.Map;
 import java.util.List;
+import java.util.regex.Pattern;
 
+import org.apache.karaf.shell.commands.Argument;
 import org.apache.karaf.shell.commands.Command;
 import org.apache.karaf.shell.commands.Option;
 import org.onlab.util.Tools;
@@ -40,6 +42,12 @@
 
     private static final String FMT = "%-20s | %-15s | %-6s | %-10s |";
     private static final String FMT_C = "%-20s | %-15s | %-19s |";
+    private boolean allTopics;
+    private Pattern pattern;
+
+    @Argument(index = 0, name = "topic", description = "A leadership topic. Can be a regex",
+            required = false, multiValued = false)
+    String topicPattern = null;
 
     @Option(name = "-c", aliases = "--candidates",
             description = "List candidate Nodes for each topic's leadership race",
@@ -75,6 +83,7 @@
 
         leaderBoard.values()
                 .stream()
+                .filter(l -> allTopics || pattern.matcher(l.topic()).matches())
                 .sorted(leadershipComparator)
                 .forEach(l -> print(FMT,
                         l.topic(),
@@ -92,6 +101,7 @@
         leaderBoard
                 .values()
                 .stream()
+                .filter(l -> allTopics || pattern.matcher(l.topic()).matches())
                 .sorted(leadershipComparator)
                 .forEach(l -> {
                         List<NodeId> list = candidates.get(l.topic()).candidates();
@@ -135,6 +145,13 @@
         LeadershipService leaderService = get(LeadershipService.class);
         Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
 
+        if (topicPattern == null) {
+            allTopics = true;
+        } else {
+            allTopics = false;
+            pattern = Pattern.compile(topicPattern);
+        }
+
         if (outputJson()) {
             print("%s", json(leaderBoard));
         } else {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 303129d..ed610ad 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -362,8 +362,8 @@
         Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
         final MutableBoolean updated = new MutableBoolean(false);
         candidateBoard.compute(path, (k, current) -> {
-            if (current != null && current.epoch() == newInfo.epoch()) {
-                log.info("updating candidateboard with {}", newInfo);
+            if (current != null && current.epoch() <= newInfo.epoch()) {
+                log.info("updating candidateboard with removal of {}", newInfo);
                 updated.setTrue();
                 if (candidates.isEmpty()) {
                     return null;
@@ -452,7 +452,7 @@
                 });
             } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
                 leaderBoard.compute(topic, (k, currentLeadership) -> {
-                    if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+                    if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) {
                         updateAccepted.setTrue();
                         return null;
                     }
@@ -462,6 +462,9 @@
                 candidateBoard.compute(topic, (k, currentInfo) -> {
                     if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
                         updateAccepted.setTrue();
+                        if (leadershipUpdate.candidates().isEmpty()) {
+                            return null;
+                        }
                         return leadershipUpdate;
                     }
                     return currentInfo;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 8a54502..27a33b3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.mastership.impl;
 
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onlab.util.Tools.futureGetOrElse;
 import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -25,12 +26,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -182,12 +179,12 @@
                 return MastershipRole.NONE;
             }
         }
-        MastershipRole role = complete(clusterCommunicator.sendAndReceive(
+        MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
                 deviceId,
                 ROLE_QUERY_SUBJECT,
                 SERIALIZER::encode,
                 SERIALIZER::decode,
-                nodeId));
+                nodeId), null);
         return role == null ? MastershipRole.NONE : role;
     }
 
@@ -270,12 +267,12 @@
         if (!nodeId.equals(localNodeId)) {
             log.debug("Forwarding request to relinquish "
                     + "role for device {} to {}", deviceId, nodeId);
-            return complete(clusterCommunicator.sendAndReceive(
+            return futureGetOrElse(clusterCommunicator.sendAndReceive(
                     deviceId,
                     ROLE_RELINQUISH_SUBJECT,
                     SERIALIZER::encode,
                     SERIALIZER::decode,
-                    nodeId));
+                    nodeId), null);
         }
 
         // Check if this node is can be managed by this node.
@@ -374,16 +371,4 @@
         return m.matches();
     }
 
-    private <T> T complete(Future<byte[]> future) {
-        try {
-            return SERIALIZER.decode(future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            log.error("Interrupted while waiting for operation to complete.", e);
-            return null;
-        } catch (TimeoutException | ExecutionException e) {
-            log.error("Failed remote operation", e);
-            return null;
-        }
-    }
 }
\ No newline at end of file