Make caching optional for LeaderElector primitive.

Change-Id: Id77ae735ceed51c61e781cbcf4fc1111be1c1298
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 78d2d4e..85039a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -120,6 +120,7 @@
         localNodeId = clusterService.getLocalNode().id();
         leaderElector = storageService.leaderElectorBuilder()
                       .withName("onos-leadership-elections")
+                      .withRelaxedReadConsistency()
                       .build()
                       .asLeaderElector();
         leaderElector.addChangeListener(leadershipChangeListener);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncLeaderElector.java
new file mode 100644
index 0000000..91dcca5
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncLeaderElector.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.service.AsyncLeaderElector;
+
+/**
+ * Caching async leader elector.
+ */
+public class CachingAsyncLeaderElector extends DelegatingAsyncLeaderElector {
+    private final LoadingCache<String, CompletableFuture<Leadership>> cache;
+    private final Consumer<Change<Leadership>> cacheUpdater;
+    private final Consumer<Status> statusListener;
+
+    public CachingAsyncLeaderElector(AsyncLeaderElector delegateLeaderElector) {
+        super(delegateLeaderElector);
+        cache = CacheBuilder.newBuilder()
+            .maximumSize(1000)
+            .build(CacheLoader.from(super::getLeadership));
+
+        cacheUpdater = change -> {
+            Leadership leadership = change.newValue();
+            cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
+        };
+        statusListener = status -> {
+            if (status == Status.SUSPENDED || status == Status.INACTIVE) {
+                cache.invalidateAll();
+            }
+        };
+        addChangeListener(cacheUpdater);
+        addStatusChangeListener(statusListener);
+    }
+
+    @Override
+    public CompletableFuture<Leadership> getLeadership(String topic) {
+        return cache.getUnchecked(topic)
+            .whenComplete((r, e) -> {
+                if (e != null) {
+                    cache.invalidate(topic);
+                }
+            });
+    }
+
+    @Override
+    public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
+        return super.run(topic, nodeId).whenComplete((r, e) -> cache.invalidate(topic));
+    }
+
+    @Override
+    public CompletableFuture<Void> withdraw(String topic) {
+        return super.withdraw(topic).whenComplete((r, e) -> cache.invalidate(topic));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
+        return super.anoint(topic, nodeId).whenComplete((r, e) -> cache.invalidate(topic));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+        return super.promote(topic, nodeId).whenComplete((r, e) -> cache.invalidate(topic));
+    }
+
+    @Override
+    public CompletableFuture<Void> evict(NodeId nodeId) {
+        return super.evict(nodeId).whenComplete((r, e) -> cache.invalidateAll());
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        removeStatusChangeListener(statusListener);
+        return removeChangeListener(cacheUpdater);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
index 24f3f9b..186668f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
@@ -32,6 +32,10 @@
 
     @Override
     public AsyncLeaderElector build() {
-        return primitiveCreator.newAsyncLeaderElector(name());
+        AsyncLeaderElector leaderElector = primitiveCreator.newAsyncLeaderElector(name());
+        if (relaxedReadConsistency()) {
+            leaderElector = new CachingAsyncLeaderElector(leaderElector);
+        }
+        return leaderElector;
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncLeaderElector.java
new file mode 100644
index 0000000..0235616
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncLeaderElector.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.service.AsyncLeaderElector;
+
+/**
+ * Delegating leader elector.
+ */
+public class DelegatingAsyncLeaderElector extends DelegatingDistributedPrimitive implements AsyncLeaderElector {
+
+    private final AsyncLeaderElector delegateLeaderElector;
+
+    public DelegatingAsyncLeaderElector(AsyncLeaderElector delegateLeaderElector) {
+        super(delegateLeaderElector);
+        this.delegateLeaderElector = delegateLeaderElector;
+    }
+
+    @Override
+    public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
+        return delegateLeaderElector.run(topic, nodeId);
+    }
+
+    @Override
+    public CompletableFuture<Void> withdraw(String topic) {
+        return delegateLeaderElector.withdraw(topic);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
+        return delegateLeaderElector.anoint(topic, nodeId);
+    }
+
+    @Override
+    public CompletableFuture<Void> evict(NodeId nodeId) {
+        return delegateLeaderElector.evict(nodeId);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+        return delegateLeaderElector.promote(topic, nodeId);
+    }
+
+    @Override
+    public CompletableFuture<Leadership> getLeadership(String topic) {
+        return delegateLeaderElector.getLeadership(topic);
+    }
+
+    @Override
+    public CompletableFuture<Map<String, Leadership>> getLeaderships() {
+        return delegateLeaderElector.getLeaderships();
+    }
+
+    @Override
+    public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
+        return delegateLeaderElector.addChangeListener(consumer);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
+        return delegateLeaderElector.removeChangeListener(consumer);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index f0c61c2c1d..14af1c8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -278,7 +278,7 @@
 
     @Override
     public AsyncLeaderElector newAsyncLeaderElector(String name) {
-        AtomixLeaderElector leaderElector = new AtomixLeaderElector(client.newProxyBuilder()
+        return new AtomixLeaderElector(client.newProxyBuilder()
                 .withName(name)
                 .withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
                 .withReadConsistency(ReadConsistency.LINEARIZABLE)
@@ -289,8 +289,6 @@
                 .build()
                 .open()
                 .join());
-        leaderElector.setupCache().join();
-        return leaderElector;
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index f2f4a36..53c7094 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -21,9 +21,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Sets;
 import io.atomix.protocols.raft.proxy.RaftProxy;
 import org.onlab.util.KryoNamespace;
@@ -63,29 +60,9 @@
             .build());
 
     private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners = Sets.newCopyOnWriteArraySet();
-    private final Consumer<Change<Leadership>> cacheUpdater;
-    private final Consumer<Status> statusListener;
-
-    private final LoadingCache<String, CompletableFuture<Leadership>> cache;
 
     public AtomixLeaderElector(RaftProxy proxy) {
         super(proxy);
-        cache = CacheBuilder.newBuilder()
-                .maximumSize(1000)
-                .build(CacheLoader.from(topic -> proxy.invoke(
-                        GET_LEADERSHIP, SERIALIZER::encode, new GetLeadership(topic), SERIALIZER::decode)));
-
-        cacheUpdater = change -> {
-            Leadership leadership = change.newValue();
-            cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
-        };
-        statusListener = status -> {
-            if (status == Status.SUSPENDED || status == Status.INACTIVE) {
-                cache.invalidateAll();
-            }
-        };
-        addStatusChangeListener(statusListener);
-
         proxy.addStateChangeListener(state -> {
             if (state == RaftProxy.State.CONNECTED && isListening()) {
                 proxy.invoke(ADD_LISTENER);
@@ -94,43 +71,29 @@
         proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
     }
 
-    @Override
-    public CompletableFuture<Void> destroy() {
-        removeStatusChangeListener(statusListener);
-        return removeChangeListener(cacheUpdater);
-    }
-
-    public CompletableFuture<AtomixLeaderElector> setupCache() {
-        return addChangeListener(cacheUpdater).thenApply(v -> this);
-    }
-
     private void handleEvent(List<Change<Leadership>> changes) {
         changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
     }
 
     @Override
     public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
-        return proxy.<Run, Leadership>invoke(RUN, SERIALIZER::encode, new Run(topic, nodeId), SERIALIZER::decode)
-                .whenComplete((r, e) -> cache.invalidate(topic));
+        return proxy.<Run, Leadership>invoke(RUN, SERIALIZER::encode, new Run(topic, nodeId), SERIALIZER::decode);
     }
 
     @Override
     public CompletableFuture<Void> withdraw(String topic) {
-        return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(topic))
-                .whenComplete((r, e) -> cache.invalidate(topic));
+        return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(topic));
     }
 
     @Override
     public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
-        return proxy.<Anoint, Boolean>invoke(ANOINT, SERIALIZER::encode, new Anoint(topic, nodeId), SERIALIZER::decode)
-                .whenComplete((r, e) -> cache.invalidate(topic));
+        return proxy.<Anoint, Boolean>invoke(ANOINT, SERIALIZER::encode, new Anoint(topic, nodeId), SERIALIZER::decode);
     }
 
     @Override
     public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
         return proxy.<Promote, Boolean>invoke(
-                PROMOTE, SERIALIZER::encode, new Promote(topic, nodeId), SERIALIZER::decode)
-                .whenComplete((r, e) -> cache.invalidate(topic));
+                PROMOTE, SERIALIZER::encode, new Promote(topic, nodeId), SERIALIZER::decode);
     }
 
     @Override
@@ -140,12 +103,7 @@
 
     @Override
     public CompletableFuture<Leadership> getLeadership(String topic) {
-        return cache.getUnchecked(topic)
-                .whenComplete((r, e) -> {
-                    if (e != null) {
-                        cache.invalidate(topic);
-                    }
-                });
+        return proxy.invoke(GET_LEADERSHIP, SERIALIZER::encode, new GetLeadership(topic), SERIALIZER::decode);
     }
 
     @Override