Make caching optional for LeaderElector primitive.
Change-Id: Id77ae735ceed51c61e781cbcf4fc1111be1c1298
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index b1bae21..746142e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -170,10 +170,11 @@
groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
localNodeId = clusterService.getLocalNode().id();
leaderElector = storageService.leaderElectorBuilder()
- .withName("onos-leadership-elections")
- .withElectionTimeout(electionTimeoutMillis)
- .build()
- .asLeaderElector();
+ .withName("onos-leadership-elections")
+ .withElectionTimeout(electionTimeoutMillis)
+ .withRelaxedReadConsistency()
+ .build()
+ .asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
leaderElector.addStatusChangeListener(clientStatusListener);
upgradeService.addListener(upgradeListener);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncLeaderElector.java
new file mode 100644
index 0000000..91dcca5
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncLeaderElector.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.service.AsyncLeaderElector;
+
+/**
+ * Caching async leader elector.
+ */
+public class CachingAsyncLeaderElector extends DelegatingAsyncLeaderElector {
+ private final LoadingCache<String, CompletableFuture<Leadership>> cache;
+ private final Consumer<Change<Leadership>> cacheUpdater;
+ private final Consumer<Status> statusListener;
+
+ public CachingAsyncLeaderElector(AsyncLeaderElector delegateLeaderElector) {
+ super(delegateLeaderElector);
+ cache = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .build(CacheLoader.from(super::getLeadership));
+
+ cacheUpdater = change -> {
+ Leadership leadership = change.newValue();
+ cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
+ };
+ statusListener = status -> {
+ if (status == Status.SUSPENDED || status == Status.INACTIVE) {
+ cache.invalidateAll();
+ }
+ };
+ addChangeListener(cacheUpdater);
+ addStatusChangeListener(statusListener);
+ }
+
+ @Override
+ public CompletableFuture<Leadership> getLeadership(String topic) {
+ return cache.getUnchecked(topic)
+ .whenComplete((r, e) -> {
+ if (e != null) {
+ cache.invalidate(topic);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
+ return super.run(topic, nodeId).whenComplete((r, e) -> cache.invalidate(topic));
+ }
+
+ @Override
+ public CompletableFuture<Void> withdraw(String topic) {
+ return super.withdraw(topic).whenComplete((r, e) -> cache.invalidate(topic));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
+ return super.anoint(topic, nodeId).whenComplete((r, e) -> cache.invalidate(topic));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+ return super.promote(topic, nodeId).whenComplete((r, e) -> cache.invalidate(topic));
+ }
+
+ @Override
+ public CompletableFuture<Void> evict(NodeId nodeId) {
+ return super.evict(nodeId).whenComplete((r, e) -> cache.invalidateAll());
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ removeStatusChangeListener(statusListener);
+ return removeChangeListener(cacheUpdater);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
index 9cef490..21ddf03 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
@@ -15,12 +15,12 @@
*/
package org.onosproject.store.primitives.impl;
-import java.util.concurrent.TimeUnit;
-
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.LeaderElectorBuilder;
+import java.util.concurrent.TimeUnit;
+
/**
* Default implementation of {@code LeaderElectorBuilder}.
*/
@@ -34,6 +34,11 @@
@Override
public AsyncLeaderElector build() {
- return primitiveCreator.newAsyncLeaderElector(name(), electionTimeoutMillis(), TimeUnit.MILLISECONDS);
+ AsyncLeaderElector leaderElector = primitiveCreator.newAsyncLeaderElector(name(), electionTimeoutMillis(),
+ TimeUnit.MILLISECONDS);
+ if (relaxedReadConsistency()) {
+ leaderElector = new CachingAsyncLeaderElector(leaderElector);
+ }
+ return leaderElector;
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncLeaderElector.java
new file mode 100644
index 0000000..0235616
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncLeaderElector.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.service.AsyncLeaderElector;
+
+/**
+ * Delegating leader elector.
+ */
+public class DelegatingAsyncLeaderElector extends DelegatingDistributedPrimitive implements AsyncLeaderElector {
+
+ private final AsyncLeaderElector delegateLeaderElector;
+
+ public DelegatingAsyncLeaderElector(AsyncLeaderElector delegateLeaderElector) {
+ super(delegateLeaderElector);
+ this.delegateLeaderElector = delegateLeaderElector;
+ }
+
+ @Override
+ public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
+ return delegateLeaderElector.run(topic, nodeId);
+ }
+
+ @Override
+ public CompletableFuture<Void> withdraw(String topic) {
+ return delegateLeaderElector.withdraw(topic);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
+ return delegateLeaderElector.anoint(topic, nodeId);
+ }
+
+ @Override
+ public CompletableFuture<Void> evict(NodeId nodeId) {
+ return delegateLeaderElector.evict(nodeId);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+ return delegateLeaderElector.promote(topic, nodeId);
+ }
+
+ @Override
+ public CompletableFuture<Leadership> getLeadership(String topic) {
+ return delegateLeaderElector.getLeadership(topic);
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Leadership>> getLeaderships() {
+ return delegateLeaderElector.getLeaderships();
+ }
+
+ @Override
+ public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
+ return delegateLeaderElector.addChangeListener(consumer);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
+ return delegateLeaderElector.removeChangeListener(consumer);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index c5d281a..7c8dee8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -278,7 +278,7 @@
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name, long leaderTimeout, TimeUnit timeUnit) {
- AtomixLeaderElector leaderElector = new AtomixLeaderElector(client.newProxyBuilder()
+ return new AtomixLeaderElector(client.newProxyBuilder()
.withName(name)
.withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE)
@@ -289,8 +289,6 @@
.build()
.open()
.join());
- leaderElector.setupCache().join();
- return leaderElector;
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index f2f4a36..53c7094 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -21,9 +21,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
import io.atomix.protocols.raft.proxy.RaftProxy;
import org.onlab.util.KryoNamespace;
@@ -63,29 +60,9 @@
.build());
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners = Sets.newCopyOnWriteArraySet();
- private final Consumer<Change<Leadership>> cacheUpdater;
- private final Consumer<Status> statusListener;
-
- private final LoadingCache<String, CompletableFuture<Leadership>> cache;
public AtomixLeaderElector(RaftProxy proxy) {
super(proxy);
- cache = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(CacheLoader.from(topic -> proxy.invoke(
- GET_LEADERSHIP, SERIALIZER::encode, new GetLeadership(topic), SERIALIZER::decode)));
-
- cacheUpdater = change -> {
- Leadership leadership = change.newValue();
- cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
- };
- statusListener = status -> {
- if (status == Status.SUSPENDED || status == Status.INACTIVE) {
- cache.invalidateAll();
- }
- };
- addStatusChangeListener(statusListener);
-
proxy.addStateChangeListener(state -> {
if (state == RaftProxy.State.CONNECTED && isListening()) {
proxy.invoke(ADD_LISTENER);
@@ -94,43 +71,29 @@
proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
}
- @Override
- public CompletableFuture<Void> destroy() {
- removeStatusChangeListener(statusListener);
- return removeChangeListener(cacheUpdater);
- }
-
- public CompletableFuture<AtomixLeaderElector> setupCache() {
- return addChangeListener(cacheUpdater).thenApply(v -> this);
- }
-
private void handleEvent(List<Change<Leadership>> changes) {
changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
}
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
- return proxy.<Run, Leadership>invoke(RUN, SERIALIZER::encode, new Run(topic, nodeId), SERIALIZER::decode)
- .whenComplete((r, e) -> cache.invalidate(topic));
+ return proxy.<Run, Leadership>invoke(RUN, SERIALIZER::encode, new Run(topic, nodeId), SERIALIZER::decode);
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
- return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(topic))
- .whenComplete((r, e) -> cache.invalidate(topic));
+ return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(topic));
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
- return proxy.<Anoint, Boolean>invoke(ANOINT, SERIALIZER::encode, new Anoint(topic, nodeId), SERIALIZER::decode)
- .whenComplete((r, e) -> cache.invalidate(topic));
+ return proxy.<Anoint, Boolean>invoke(ANOINT, SERIALIZER::encode, new Anoint(topic, nodeId), SERIALIZER::decode);
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
return proxy.<Promote, Boolean>invoke(
- PROMOTE, SERIALIZER::encode, new Promote(topic, nodeId), SERIALIZER::decode)
- .whenComplete((r, e) -> cache.invalidate(topic));
+ PROMOTE, SERIALIZER::encode, new Promote(topic, nodeId), SERIALIZER::decode);
}
@Override
@@ -140,12 +103,7 @@
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
- return cache.getUnchecked(topic)
- .whenComplete((r, e) -> {
- if (e != null) {
- cache.invalidate(topic);
- }
- });
+ return proxy.invoke(GET_LEADERSHIP, SERIALIZER::encode, new GetLeadership(topic), SERIALIZER::decode);
}
@Override