AsyncLeaderElector APIs in support for mastership balancing

Change-Id: Ia235c6a18c54490dc49ca13e2caebf70b750dbc7
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index 23dcf52..ef8c444 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -43,10 +43,12 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
@@ -86,6 +88,8 @@
         executor.register(Run.class, this::run);
         executor.register(Withdraw.class, this::withdraw);
         executor.register(Anoint.class, this::anoint);
+        executor.register(Promote.class, this::promote);
+        executor.register(Evict.class, this::evict);
         // Queries
         executor.register(GetLeadership.class, this::leadership);
         executor.register(GetAllLeaderships.class, this::allLeaderships);
@@ -93,8 +97,16 @@
     }
 
     private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
-        Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
-        listeners.values().forEach(listener -> listener.session().publish("change", change));
+        notifyLeadershipChanges(Arrays.asList(new Change<>(previousLeadership, newLeadership)));
+    }
+
+    private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
+        if (changes.isEmpty()) {
+            return;
+        }
+        listeners.values()
+                 .forEach(listener -> listener.session()
+                                              .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
     }
 
     @Override
@@ -206,6 +218,53 @@
     }
 
     /**
+     * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
+     * @param commit promote commit
+     * @return {@code true} if changes desired end state is achieved.
+     */
+    public boolean promote(Commit<? extends Promote> commit) {
+        try {
+            String topic = commit.operation().topic();
+            NodeId nodeId = commit.operation().nodeId();
+            Leadership oldLeadership = leadership(topic);
+            if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
+                return false;
+            }
+            elections.computeIfPresent(topic, (k, v) -> new ElectionState(v).promote(commit.operation().nodeId()));
+            Leadership newLeadership = leadership(topic);
+            if (!Objects.equal(oldLeadership, newLeadership)) {
+                notifyLeadershipChange(oldLeadership, newLeadership);
+            }
+            return true;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
+     * @param commit evict commit
+     */
+    public void evict(Commit<? extends Evict> commit) {
+        try {
+            List<Change<Leadership>> changes = Lists.newLinkedList();
+            NodeId nodeId = commit.operation().nodeId();
+            Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
+            topics.forEach(topic -> {
+                Leadership oldLeadership = leadership(topic);
+                elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
+                Leadership newLeadership = leadership(topic);
+                if (!Objects.equal(oldLeadership, newLeadership)) {
+                    changes.add(new Change<>(oldLeadership, newLeadership));
+                }
+            });
+            notifyLeadershipChanges(changes);
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
      * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
      * @param commit GetLeadership commit
      * @return leader
@@ -362,6 +421,31 @@
             }
         }
 
+        public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
+            Optional<Registration> registration =
+                    registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
+            if (registration.isPresent()) {
+                List<Registration> updatedRegistrations =
+                        registrations.stream()
+                        .filter(r -> !r.nodeId().equals(nodeId))
+                        .collect(Collectors.toList());
+                if (leader.nodeId().equals(nodeId)) {
+                    if (updatedRegistrations.size() > 0) {
+                        return new ElectionState(updatedRegistrations,
+                                updatedRegistrations.get(0),
+                                termCounter.get(),
+                                System.currentTimeMillis());
+                    } else {
+                        return new ElectionState(updatedRegistrations, null, term, termStartTime);
+                    }
+                } else {
+                    return new ElectionState(updatedRegistrations, leader, term, termStartTime);
+                }
+            } else {
+                return this;
+            }
+        }
+
         public boolean isDuplicate(Registration registration) {
             return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
         }
@@ -406,6 +490,23 @@
                 return this;
             }
         }
+
+        public ElectionState promote(NodeId nodeId) {
+            Registration registration = registrations.stream()
+                                                  .filter(r -> r.nodeId().equals(nodeId))
+                                                  .findFirst()
+                                                  .orElse(null);
+            List<Registration> updatedRegistrations = Lists.newLinkedList();
+            updatedRegistrations.add(registration);
+            registrations.stream()
+                         .filter(r -> !r.nodeId().equals(nodeId))
+                         .forEach(updatedRegistrations::add);
+            return new ElectionState(updatedRegistrations,
+                                    leader,
+                                    term,
+                                    termStartTime);
+
+        }
     }
 
     @Override