Migrating to latest Atomix

Change-Id: Ie636d1b2623b7f83572dca0d70bd56734379e61a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index f4a4252..6baa835 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -42,6 +42,9 @@
 import org.onosproject.store.service.AsyncLeaderElector;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Sets;
 
 /**
@@ -54,11 +57,34 @@
             Sets.newCopyOnWriteArraySet();
     private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
             Sets.newCopyOnWriteArraySet();
+    private final Consumer<Change<Leadership>> cacheUpdater;
+    private final Consumer<Status> statusListener;
 
     public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
+    private final LoadingCache<String, CompletableFuture<Leadership>> cache;
 
     public AtomixLeaderElector(CopycatClient client, Properties properties) {
         super(client, properties);
+        cache = CacheBuilder.newBuilder()
+                .maximumSize(1000)
+                .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
+
+        cacheUpdater = change -> {
+            Leadership leadership = change.newValue();
+            cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
+        };
+        statusListener = status -> {
+            if (status == Status.SUSPENDED || status == Status.INACTIVE) {
+                cache.invalidateAll();
+            }
+        };
+        addStatusChangeListener(statusListener);
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        removeStatusChangeListener(statusListener);
+        return removeChangeListener(cacheUpdater);
     }
 
     @Override
@@ -74,53 +100,57 @@
         });
     }
 
+    public CompletableFuture<AtomixLeaderElector> setupCache() {
+        return addChangeListener(cacheUpdater).thenApply(v -> this);
+    }
+
     private void handleEvent(List<Change<Leadership>> changes) {
         changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
     }
 
     @Override
     public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
-        return submit(new Run(topic, nodeId));
+        return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Void> withdraw(String topic) {
-        return submit(new Withdraw(topic));
+        return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
-        return submit(new Anoint(topic, nodeId));
+        return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
-        return submit(new Promote(topic, nodeId));
+        return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Void> evict(NodeId nodeId) {
-        return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+        return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
     }
 
     @Override
     public CompletableFuture<Leadership> getLeadership(String topic) {
-        return submit(new GetLeadership(topic));
+        return cache.getUnchecked(topic);
     }
 
     @Override
     public CompletableFuture<Map<String, Leadership>> getLeaderships() {
-        return submit(new GetAllLeaderships());
+        return client.submit(new GetAllLeaderships());
     }
 
     public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
-        return submit(new GetElectedTopics(nodeId));
+        return client.submit(new GetElectedTopics(nodeId));
     }
 
     @Override
     public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
         if (leadershipChangeListeners.isEmpty()) {
-            return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
+            return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
         } else {
             leadershipChangeListeners.add(consumer);
             return CompletableFuture.completedFuture(null);
@@ -130,7 +160,7 @@
     @Override
     public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
         if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
-            return submit(new Unlisten()).thenApply(v -> null);
+            return client.submit(new Unlisten()).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }