Migrating to latest Atomix
Change-Id: Ie636d1b2623b7f83572dca0d70bd56734379e61a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index f4a4252..6baa835 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -42,6 +42,9 @@
import org.onosproject.store.service.AsyncLeaderElector;
import com.google.common.collect.ImmutableSet;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
/**
@@ -54,11 +57,34 @@
Sets.newCopyOnWriteArraySet();
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newCopyOnWriteArraySet();
+ private final Consumer<Change<Leadership>> cacheUpdater;
+ private final Consumer<Status> statusListener;
public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
+ private final LoadingCache<String, CompletableFuture<Leadership>> cache;
public AtomixLeaderElector(CopycatClient client, Properties properties) {
super(client, properties);
+ cache = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
+
+ cacheUpdater = change -> {
+ Leadership leadership = change.newValue();
+ cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
+ };
+ statusListener = status -> {
+ if (status == Status.SUSPENDED || status == Status.INACTIVE) {
+ cache.invalidateAll();
+ }
+ };
+ addStatusChangeListener(statusListener);
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ removeStatusChangeListener(statusListener);
+ return removeChangeListener(cacheUpdater);
}
@Override
@@ -74,53 +100,57 @@
});
}
+ public CompletableFuture<AtomixLeaderElector> setupCache() {
+ return addChangeListener(cacheUpdater).thenApply(v -> this);
+ }
+
private void handleEvent(List<Change<Leadership>> changes) {
changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
}
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
- return submit(new Run(topic, nodeId));
+ return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
- return submit(new Withdraw(topic));
+ return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
- return submit(new Anoint(topic, nodeId));
+ return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
- return submit(new Promote(topic, nodeId));
+ return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Void> evict(NodeId nodeId) {
- return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+ return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
}
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
- return submit(new GetLeadership(topic));
+ return cache.getUnchecked(topic);
}
@Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() {
- return submit(new GetAllLeaderships());
+ return client.submit(new GetAllLeaderships());
}
public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
- return submit(new GetElectedTopics(nodeId));
+ return client.submit(new GetElectedTopics(nodeId));
}
@Override
public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.isEmpty()) {
- return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
+ return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
} else {
leadershipChangeListeners.add(consumer);
return CompletableFuture.completedFuture(null);
@@ -130,7 +160,7 @@
@Override
public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
- return submit(new Unlisten()).thenApply(v -> null);
+ return client.submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}