AsyncLeaderElector APIs in support for mastership balancing

Change-Id: Ia235c6a18c54490dc49ca13e2caebf70b750dbc7
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultLeaderElector.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultLeaderElector.java
index e7a7f82..1f1e1f6 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultLeaderElector.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultLeaderElector.java
@@ -60,6 +60,16 @@
     }
 
     @Override
+    public boolean promote(String topic, NodeId nodeId) {
+        return complete(asyncElector.promote(topic, nodeId));
+    }
+
+    @Override
+    public void evict(NodeId nodeId) {
+        complete(asyncElector.evict(nodeId));
+    }
+
+    @Override
     public Leadership getLeadership(String topic) {
         return complete(asyncElector.getLeadership(topic));
     }
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java b/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java
index 3aed8d2..2cac6fe 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java
@@ -76,6 +76,28 @@
     CompletableFuture<Boolean> anoint(String topic, NodeId nodeId);
 
     /**
+     * Attempts to evict a node from all leadership elections it is registered for.
+     * <p>
+     * If the node is the current leader for a topic, this call will promote the next top candidate
+     * (if one exists) to leadership.
+     *
+     * @param nodeId node instance identifier
+     * @return CompletableFuture that is completed when the operation is done.
+     */
+    CompletableFuture<Void> evict(NodeId nodeId);
+
+    /**
+     * Attempts to promote a node to top of candidate list without displacing the current leader.
+     *
+     * @param topic leadership topic
+     * @param nodeId instance identifier of the new top candidate
+     * @return CompletableFuture that is completed with a boolean when the operation is done. Boolean is true if
+     * node is now the top candidate. This operation can fail (i.e. return false) if the node
+     * is not registered to run for election for the topic.
+     */
+    CompletableFuture<Boolean> promote(String topic, NodeId nodeId);
+
+    /**
      * Returns the {@link Leadership} for the specified topic.
      * @param topic leadership topic
      * @return CompletableFuture that is completed with the current Leadership state of the topic
diff --git a/core/api/src/main/java/org/onosproject/store/service/LeaderElector.java b/core/api/src/main/java/org/onosproject/store/service/LeaderElector.java
index b37d1c4..f4257be 100644
--- a/core/api/src/main/java/org/onosproject/store/service/LeaderElector.java
+++ b/core/api/src/main/java/org/onosproject/store/service/LeaderElector.java
@@ -58,6 +58,25 @@
     boolean anoint(String topic, NodeId nodeId);
 
     /**
+     * Attempts to promote a node to top of candidate list.
+     *
+     * @param topic leadership topic
+     * @param nodeId instance identifier of the new top candidate
+     * @return {@code true} if node is now the top candidate. This operation can fail (i.e. return
+     * {@code false}) if the node is not registered to run for election for the topic.
+     */
+    boolean promote(String topic, NodeId nodeId);
+
+    /**
+     * Attempts to evict a node from all leadership elections it is registered for.
+     * <p>
+     * If the node the current leader for a topic, this call will force the next candidate (if one exists)
+     * to be promoted to leadership.
+     * @param nodeId node instance identifier
+     */
+    void evict(NodeId nodeId);
+
+    /**
      * Returns the {@link Leadership} for the specified topic.
      * @param topic leadership topic
      * @return current Leadership state of the topic
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NewDistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NewDistributedLeadershipStore.java
index ad1d0a2..914da94 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NewDistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/NewDistributedLeadershipStore.java
@@ -128,8 +128,7 @@
 
     @Override
     public void removeRegistration(NodeId nodeId) {
-        // TODO
-        throw new UnsupportedOperationException();
+        leaderElector.evict(nodeId);
     }
 
     @Override
@@ -139,8 +138,7 @@
 
     @Override
     public boolean makeTopCandidate(String topic, NodeId nodeId) {
-        // TODO
-        throw new UnsupportedOperationException();
+        return leaderElector.promote(topic, nodeId);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncLeaderElector.java
index e364a7e..27c92ec 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncLeaderElector.java
@@ -70,6 +70,18 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+        return getLeaderElector(topic).promote(topic, nodeId);
+    }
+
+    @Override
+    public CompletableFuture<Void> evict(NodeId nodeId) {
+        return CompletableFuture.allOf(getLeaderElectors().stream()
+                                                          .map(le -> le.evict(nodeId))
+                                                          .toArray(CompletableFuture[]::new));
+    }
+
+    @Override
     public CompletableFuture<Leadership> getLeadership(String topic) {
         return getLeaderElector(topic).getLeadership(topic);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 9995c4d..b8c5afa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -20,6 +20,7 @@
 import io.atomix.resource.Resource;
 import io.atomix.resource.ResourceTypeInfo;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -43,6 +44,7 @@
     private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
             Sets.newConcurrentHashSet();
 
+    public static final String CHANGE_SUBJECT = "changeEvents";
     private Listener<Change<Leadership>> listener;
 
     public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
@@ -57,13 +59,13 @@
     @Override
     public CompletableFuture<AtomixLeaderElector> open() {
         return super.open().thenApply(result -> {
-            client.onEvent("change", this::handleEvent);
+            client.onEvent(CHANGE_SUBJECT, this::handleEvent);
             return result;
         });
     }
 
-    private void handleEvent(Change<Leadership> change) {
-        leadershipChangeListeners.forEach(l -> l.accept(change));
+    private void handleEvent(List<Change<Leadership>> changes) {
+        changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
     }
 
     @Override
@@ -82,6 +84,16 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+        return submit(new AtomixLeaderElectorCommands.Promote(topic, nodeId));
+    }
+
+    @Override
+    public CompletableFuture<Void> evict(NodeId nodeId) {
+        return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+    }
+
+    @Override
     public CompletableFuture<Leadership> getLeadership(String topic) {
         return submit(new AtomixLeaderElectorCommands.GetLeadership(topic));
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
index 235bd07..8efa234 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -346,6 +346,102 @@
     }
 
     /**
+     * Command for administratively promote a node as top candidate.
+     */
+    @SuppressWarnings("serial")
+    public static class Promote extends ElectionCommand<Boolean> {
+        private String topic;
+        private NodeId nodeId;
+
+        public Promote() {
+        }
+
+        public Promote(String topic, NodeId nodeId) {
+            this.topic = topic;
+            this.nodeId = nodeId;
+        }
+
+        /**
+         * Returns the topic.
+         *
+         * @return The topic
+         */
+        public String topic() {
+            return topic;
+        }
+
+        /**
+         * Returns the nodeId to make top candidate.
+         *
+         * @return The nodeId
+         */
+        public NodeId nodeId() {
+            return nodeId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("topic", topic)
+                    .add("nodeId", nodeId)
+                    .toString();
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            buffer.writeString(topic);
+            buffer.writeString(nodeId.toString());
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            topic = buffer.readString();
+            nodeId = new NodeId(buffer.readString());
+        }
+    }
+
+    /**
+     * Command for administratively evicting a node from all leadership topics.
+     */
+    @SuppressWarnings("serial")
+    public static class Evict extends ElectionCommand<Void> {
+        private NodeId nodeId;
+
+        public Evict() {
+        }
+
+        public Evict(NodeId nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        /**
+         * Returns the node identifier.
+         *
+         * @return The nodeId
+         */
+        public NodeId nodeId() {
+            return nodeId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("nodeId", nodeId)
+                    .toString();
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            buffer.writeString(nodeId.toString());
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            nodeId = new NodeId(buffer.readString());
+        }
+    }
+
+    /**
      * Map command type resolver.
      */
     public static class TypeResolver implements SerializableTypeResolver {
@@ -359,6 +455,8 @@
             registry.register(GetLeadership.class, -866);
             registry.register(Listen.class, -867);
             registry.register(Unlisten.class, -868);
+            registry.register(Promote.class, -869);
+            registry.register(Evict.class, -870);
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index 23dcf52..ef8c444 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -43,10 +43,12 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
@@ -86,6 +88,8 @@
         executor.register(Run.class, this::run);
         executor.register(Withdraw.class, this::withdraw);
         executor.register(Anoint.class, this::anoint);
+        executor.register(Promote.class, this::promote);
+        executor.register(Evict.class, this::evict);
         // Queries
         executor.register(GetLeadership.class, this::leadership);
         executor.register(GetAllLeaderships.class, this::allLeaderships);
@@ -93,8 +97,16 @@
     }
 
     private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
-        Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
-        listeners.values().forEach(listener -> listener.session().publish("change", change));
+        notifyLeadershipChanges(Arrays.asList(new Change<>(previousLeadership, newLeadership)));
+    }
+
+    private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
+        if (changes.isEmpty()) {
+            return;
+        }
+        listeners.values()
+                 .forEach(listener -> listener.session()
+                                              .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
     }
 
     @Override
@@ -206,6 +218,53 @@
     }
 
     /**
+     * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
+     * @param commit promote commit
+     * @return {@code true} if changes desired end state is achieved.
+     */
+    public boolean promote(Commit<? extends Promote> commit) {
+        try {
+            String topic = commit.operation().topic();
+            NodeId nodeId = commit.operation().nodeId();
+            Leadership oldLeadership = leadership(topic);
+            if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
+                return false;
+            }
+            elections.computeIfPresent(topic, (k, v) -> new ElectionState(v).promote(commit.operation().nodeId()));
+            Leadership newLeadership = leadership(topic);
+            if (!Objects.equal(oldLeadership, newLeadership)) {
+                notifyLeadershipChange(oldLeadership, newLeadership);
+            }
+            return true;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
+     * @param commit evict commit
+     */
+    public void evict(Commit<? extends Evict> commit) {
+        try {
+            List<Change<Leadership>> changes = Lists.newLinkedList();
+            NodeId nodeId = commit.operation().nodeId();
+            Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
+            topics.forEach(topic -> {
+                Leadership oldLeadership = leadership(topic);
+                elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
+                Leadership newLeadership = leadership(topic);
+                if (!Objects.equal(oldLeadership, newLeadership)) {
+                    changes.add(new Change<>(oldLeadership, newLeadership));
+                }
+            });
+            notifyLeadershipChanges(changes);
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
      * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
      * @param commit GetLeadership commit
      * @return leader
@@ -362,6 +421,31 @@
             }
         }
 
+        public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
+            Optional<Registration> registration =
+                    registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
+            if (registration.isPresent()) {
+                List<Registration> updatedRegistrations =
+                        registrations.stream()
+                        .filter(r -> !r.nodeId().equals(nodeId))
+                        .collect(Collectors.toList());
+                if (leader.nodeId().equals(nodeId)) {
+                    if (updatedRegistrations.size() > 0) {
+                        return new ElectionState(updatedRegistrations,
+                                updatedRegistrations.get(0),
+                                termCounter.get(),
+                                System.currentTimeMillis());
+                    } else {
+                        return new ElectionState(updatedRegistrations, null, term, termStartTime);
+                    }
+                } else {
+                    return new ElectionState(updatedRegistrations, leader, term, termStartTime);
+                }
+            } else {
+                return this;
+            }
+        }
+
         public boolean isDuplicate(Registration registration) {
             return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
         }
@@ -406,6 +490,23 @@
                 return this;
             }
         }
+
+        public ElectionState promote(NodeId nodeId) {
+            Registration registration = registrations.stream()
+                                                  .filter(r -> r.nodeId().equals(nodeId))
+                                                  .findFirst()
+                                                  .orElse(null);
+            List<Registration> updatedRegistrations = Lists.newLinkedList();
+            updatedRegistrations.add(registration);
+            registrations.stream()
+                         .filter(r -> !r.nodeId().equals(nodeId))
+                         .forEach(updatedRegistrations::add);
+            return new ElectionState(updatedRegistrations,
+                                    leader,
+                                    term,
+                                    termStartTime);
+
+        }
     }
 
     @Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
index 1990534..2b1f56b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -51,10 +51,10 @@
     public void testRun() throws Throwable {
         leaderElectorRunTests(1);
         clearTests();
-//        leaderElectorRunTests(2);
-//        clearTests();
-//        leaderElectorRunTests(3);
-//        clearTests();
+        leaderElectorRunTests(2);
+        clearTests();
+        leaderElectorRunTests(3);
+        clearTests();
     }
 
     private void leaderElectorRunTests(int numServers) throws Throwable {
@@ -183,6 +183,63 @@
     }
 
     @Test
+    public void testPromote() throws Throwable {
+        leaderElectorPromoteTests(1);
+        clearTests();
+        leaderElectorPromoteTests(2);
+        clearTests();
+        leaderElectorPromoteTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorPromoteTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        Atomix client3 = createAtomixClient();
+        AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        elector2.run("foo", node2).join();
+
+        LeaderEventListener listener1 = new LeaderEventListener();
+        elector1.addChangeListener(listener1).join();
+        LeaderEventListener listener2 = new LeaderEventListener();
+        elector2.addChangeListener(listener2).join();
+        LeaderEventListener listener3 = new LeaderEventListener();
+        elector3.addChangeListener(listener3).join();
+
+        elector3.promote("foo", node3).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        assertFalse(listener1.hasEvent());
+        assertFalse(listener2.hasEvent());
+        assertFalse(listener3.hasEvent());
+
+        elector3.run("foo", node3).join();
+
+        listener1.clearEvents();
+        listener2.clearEvents();
+        listener3.clearEvents();
+
+        elector3.promote("foo", node3).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        listener1.nextEvent().thenAccept(result -> {
+            assertEquals(node3, result.newValue().candidates().get(0));
+        }).join();
+        listener2.nextEvent().thenAccept(result -> {
+            assertEquals(node3, result.newValue().candidates().get(0));
+        }).join();
+        listener3.nextEvent().thenAccept(result -> {
+            assertEquals(node3, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
     public void testLeaderSessionClose() throws Throwable {
         leaderElectorLeaderSessionCloseTests(1);
         clearTests();
@@ -325,6 +382,10 @@
             return !eventQueue.isEmpty();
         }
 
+        public void clearEvents() {
+            eventQueue.clear();
+        }
+
         public CompletableFuture<Change<Leadership>> nextEvent() {
             synchronized (this) {
                 if (eventQueue.isEmpty()) {