Migrating to latest Atomix
Change-Id: Ie636d1b2623b7f83572dca0d70bd56734379e61a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
index 6838ab3..251a7ca 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
@@ -48,10 +48,6 @@
@SuppressWarnings("serial")
public abstract static class MultimapCommand<V> implements Command<V>,
CatalystSerializable {
- @Override
- public ConsistencyLevel consistency() {
- return ConsistencyLevel.SEQUENTIAL;
- }
@Override
public String toString() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java
index 3d735be..45b4d56 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java
@@ -70,95 +70,90 @@
@Override
public CompletableFuture<Integer> size() {
- return submit(new Size());
+ return client.submit(new Size());
}
@Override
public CompletableFuture<Boolean> isEmpty() {
- return submit(new IsEmpty());
+ return client.submit(new IsEmpty());
}
@Override
public CompletableFuture<Boolean> containsKey(String key) {
- return submit(new ContainsKey(key));
+ return client.submit(new ContainsKey(key));
}
@Override
public CompletableFuture<Boolean> containsValue(byte[] value) {
- return submit(new ContainsValue(value));
+ return client.submit(new ContainsValue(value));
}
@Override
public CompletableFuture<Boolean> containsEntry(String key, byte[] value) {
- return submit(new ContainsEntry(key, value));
+ return client.submit(new ContainsEntry(key, value));
}
@Override
public CompletableFuture<Boolean> put(String key, byte[] value) {
- return submit(new Put(key, Lists.newArrayList(value), null));
+ return client.submit(new Put(key, Lists.newArrayList(value), null));
}
@Override
public CompletableFuture<Boolean> remove(String key, byte[] value) {
- return submit(new MultiRemove(key,
- Lists.newArrayList(value),
- null));
+ return client.submit(new MultiRemove(key,
+ Lists.newArrayList(value),
+ null));
}
@Override
- public CompletableFuture<Boolean> removeAll(
- String key, Collection<? extends byte[]> values) {
- return submit(new MultiRemove(key, (Collection<byte[]>) values, null));
+ public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
+ return client.submit(new MultiRemove(key, (Collection<byte[]>) values, null));
}
@Override
- public CompletableFuture<
- Versioned<Collection<? extends byte[]>>> removeAll(String key) {
- return submit(new RemoveAll(key, null));
+ public CompletableFuture<Versioned<Collection<? extends byte[]>>> removeAll(String key) {
+ return client.submit(new RemoveAll(key, null));
}
@Override
public CompletableFuture<Boolean> putAll(
String key, Collection<? extends byte[]> values) {
- return submit(new Put(key, values, null));
+ return client.submit(new Put(key, values, null));
}
@Override
- public CompletableFuture<
- Versioned<Collection<? extends byte[]>>> replaceValues(
+ public CompletableFuture<Versioned<Collection<? extends byte[]>>> replaceValues(
String key, Collection<byte[]> values) {
- return submit(new Replace(key, values, null));
+ return client.submit(new Replace(key, values, null));
}
@Override
public CompletableFuture<Void> clear() {
- return submit(new Clear());
+ return client.submit(new Clear());
}
@Override
- public CompletableFuture<
- Versioned<Collection<? extends byte[]>>> get(String key) {
- return submit(new Get(key));
+ public CompletableFuture<Versioned<Collection<? extends byte[]>>> get(String key) {
+ return client.submit(new Get(key));
}
@Override
public CompletableFuture<Set<String>> keySet() {
- return submit(new KeySet());
+ return client.submit(new KeySet());
}
@Override
public CompletableFuture<Multiset<String>> keys() {
- return submit(new Keys());
+ return client.submit(new Keys());
}
- @Override
public CompletableFuture<Multiset<byte[]>> values() {
- return submit(new Values());
+ return client.submit(new Values());
}
@Override
public CompletableFuture<Collection<Map.Entry<String, byte[]>>> entries() {
- return submit(new Entries());
+ return client.submit(new Entries());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 9bad652..f1a44de 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -97,48 +97,48 @@
@Override
public CompletableFuture<Boolean> isEmpty() {
- return submit(new IsEmpty());
+ return client.submit(new IsEmpty());
}
@Override
public CompletableFuture<Integer> size() {
- return submit(new Size());
+ return client.submit(new Size());
}
@Override
public CompletableFuture<Boolean> containsKey(String key) {
- return submit(new ContainsKey(key));
+ return client.submit(new ContainsKey(key));
}
@Override
public CompletableFuture<Boolean> containsValue(byte[] value) {
- return submit(new ContainsValue(value));
+ return client.submit(new ContainsValue(value));
}
@Override
public CompletableFuture<Versioned<byte[]>> get(String key) {
- return submit(new Get(key));
+ return client.submit(new Get(key));
}
@Override
public CompletableFuture<Set<String>> keySet() {
- return submit(new KeySet());
+ return client.submit(new KeySet());
}
@Override
public CompletableFuture<Collection<Versioned<byte[]>>> values() {
- return submit(new Values());
+ return client.submit(new Values());
}
@Override
public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
- return submit(new EntrySet());
+ return client.submit(new EntrySet());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
- return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+ return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@@ -146,7 +146,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
- return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+ return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.newValue());
}
@@ -154,14 +154,14 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
- return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
+ return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> remove(String key) {
- return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
+ return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@@ -169,7 +169,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, byte[] value) {
- return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
+ return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@@ -177,7 +177,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, long version) {
- return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
+ return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@@ -185,7 +185,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
- return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
+ return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@@ -193,10 +193,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
- return submit(new UpdateAndGet(key,
- newValue,
- Match.ifValue(oldValue),
- Match.ANY))
+ return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@@ -204,17 +201,14 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
- return submit(new UpdateAndGet(key,
- newValue,
- Match.ANY,
- Match.ifValue(oldVersion)))
+ return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Void> clear() {
- return submit(new Clear())
+ return client.submit(new Clear())
.whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> null);
}
@@ -245,7 +239,7 @@
}
Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
- return submit(new UpdateAndGet(key,
+ return client.submit(new UpdateAndGet(key,
computedValue.get(),
valueMatch,
versionMatch))
@@ -258,7 +252,7 @@
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
Executor executor) {
if (mapEventListeners.isEmpty()) {
- return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
+ return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
} else {
mapEventListeners.put(listener, executor);
return CompletableFuture.completedFuture(null);
@@ -268,7 +262,7 @@
@Override
public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
- return submit(new Unlisten()).thenApply(v -> null);
+ return client.submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}
@@ -281,23 +275,23 @@
@Override
public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
- return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
+ return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
- return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
+ return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
- return submit(new TransactionRollback(transactionId))
+ return client.submit(new TransactionRollback(transactionId))
.thenApply(v -> null);
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
- return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
+ return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index ae75453..7e7b979 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -51,11 +51,6 @@
public abstract static class MapCommand<V> implements Command<V>, CatalystSerializable {
@Override
- public ConsistencyLevel consistency() {
- return ConsistencyLevel.SEQUENTIAL;
- }
-
- @Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index f4a4252..6baa835 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -42,6 +42,9 @@
import org.onosproject.store.service.AsyncLeaderElector;
import com.google.common.collect.ImmutableSet;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
/**
@@ -54,11 +57,34 @@
Sets.newCopyOnWriteArraySet();
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newCopyOnWriteArraySet();
+ private final Consumer<Change<Leadership>> cacheUpdater;
+ private final Consumer<Status> statusListener;
public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
+ private final LoadingCache<String, CompletableFuture<Leadership>> cache;
public AtomixLeaderElector(CopycatClient client, Properties properties) {
super(client, properties);
+ cache = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
+
+ cacheUpdater = change -> {
+ Leadership leadership = change.newValue();
+ cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
+ };
+ statusListener = status -> {
+ if (status == Status.SUSPENDED || status == Status.INACTIVE) {
+ cache.invalidateAll();
+ }
+ };
+ addStatusChangeListener(statusListener);
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ removeStatusChangeListener(statusListener);
+ return removeChangeListener(cacheUpdater);
}
@Override
@@ -74,53 +100,57 @@
});
}
+ public CompletableFuture<AtomixLeaderElector> setupCache() {
+ return addChangeListener(cacheUpdater).thenApply(v -> this);
+ }
+
private void handleEvent(List<Change<Leadership>> changes) {
changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
}
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
- return submit(new Run(topic, nodeId));
+ return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
- return submit(new Withdraw(topic));
+ return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
- return submit(new Anoint(topic, nodeId));
+ return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
- return submit(new Promote(topic, nodeId));
+ return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Void> evict(NodeId nodeId) {
- return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+ return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
}
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
- return submit(new GetLeadership(topic));
+ return cache.getUnchecked(topic);
}
@Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() {
- return submit(new GetAllLeaderships());
+ return client.submit(new GetAllLeaderships());
}
public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
- return submit(new GetElectedTopics(nodeId));
+ return client.submit(new GetElectedTopics(nodeId));
}
@Override
public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.isEmpty()) {
- return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
+ return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
} else {
leadershipChangeListeners.add(consumer);
return CompletableFuture.completedFuture(null);
@@ -130,7 +160,7 @@
@Override
public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
- return submit(new Unlisten()).thenApply(v -> null);
+ return client.submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
index 620d7f2..7ee481c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -50,11 +50,6 @@
public abstract static class ElectionQuery<V> implements Query<V>, CatalystSerializable {
@Override
- public ConsistencyLevel consistency() {
- return ConsistencyLevel.BOUNDED_LINEARIZABLE;
- }
-
- @Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@@ -103,11 +98,6 @@
public abstract static class ElectionCommand<V> implements Command<V>, CatalystSerializable {
@Override
- public ConsistencyLevel consistency() {
- return ConsistencyLevel.LINEARIZABLE;
- }
-
- @Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index df04e6d..894cc79 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -302,8 +302,10 @@
* @return topic to leader mapping
*/
public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
+ Map<String, Leadership> result = new HashMap<>();
try {
- return Maps.transformEntries(elections, (k, v) -> leadership(k));
+ result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
+ return result;
} finally {
commit.close();
}
@@ -539,7 +541,7 @@
byte[] encodedElections = serializer.encode(elections);
writer.writeInt(encodedElections.length);
writer.write(encodedElections);
- log.info("Took state machine snapshot");
+ log.debug("Took state machine snapshot");
}
@Override
@@ -552,7 +554,7 @@
byte[] encodedElections = new byte[encodedElectionsSize];
reader.read(encodedElections);
elections = serializer.decode(encodedElections);
- log.info("Reinstated state machine from snapshot");
+ log.debug("Reinstated state machine from snapshot");
}
private AtomicLong termCounter(String topic) {