State machine implementations for various distributed primitives based on latest Copycat APIs
Change-Id: I622cc196aa1cdf072a5a0b100a5ffaaf71b07900
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
new file mode 100644
index 0000000..46487f2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.util.Listener;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.Consistency;
+import io.atomix.resource.Resource;
+import io.atomix.resource.ResourceTypeInfo;
+
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import org.onlab.util.Match;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed resource providing the {@link AsyncConsistentMap} primitive.
+ */
+@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
+public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
+ implements AsyncConsistentMap<String, byte[]> {
+
+ private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
+
+ private static final String CHANGE_SUBJECT = "change";
+
+ public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
+ super(client, options);
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<AtomixConsistentMap> open() {
+ return super.open().thenApply(result -> {
+ client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
+ return result;
+ });
+ }
+
+ private void handleEvent(MapEvent<String, byte[]> event) {
+ mapEventListeners.forEach(listener -> listener.event(event));
+ }
+
+ @Override
+ public AtomixConsistentMap with(Consistency consistency) {
+ super.with(consistency);
+ return this;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return submit(new AtomixConsistentMapCommands.IsEmpty());
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return submit(new AtomixConsistentMapCommands.Size());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(String key) {
+ return submit(new AtomixConsistentMapCommands.ContainsKey(key));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(byte[] value) {
+ return submit(new AtomixConsistentMapCommands.ContainsValue(value));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> get(String key) {
+ return submit(new AtomixConsistentMapCommands.Get(key));
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> keySet() {
+ return submit(new AtomixConsistentMapCommands.KeySet());
+ }
+
+ @Override
+ public CompletableFuture<Collection<Versioned<byte[]>>> values() {
+ return submit(new AtomixConsistentMapCommands.Values());
+ }
+
+ @Override
+ public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
+ return submit(new AtomixConsistentMapCommands.EntrySet());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.oldValue());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.newValue());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.oldValue());
+ }
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> remove(String key) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.oldValue());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Boolean> remove(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.updated());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Boolean> remove(String key, long version) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.updated());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.oldValue());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+ newValue,
+ Match.ifValue(oldValue),
+ Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.updated());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+ newValue,
+ Match.ANY,
+ Match.ifValue(oldVersion)))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.updated());
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return submit(new AtomixConsistentMapCommands.Clear())
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> null);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> computeIf(String key,
+ Predicate<? super byte[]> condition,
+ BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
+ return get(key).thenCompose(r1 -> {
+ byte[] existingValue = r1 == null ? null : r1.value();
+ // if the condition evaluates to false, return existing value.
+ if (!condition.test(existingValue)) {
+ return CompletableFuture.completedFuture(r1);
+ }
+
+ AtomicReference<byte[]> computedValue = new AtomicReference<>();
+ // if remappingFunction throws an exception, return the exception.
+ try {
+ computedValue.set(remappingFunction.apply(key, existingValue));
+ } catch (Exception e) {
+ CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ if (computedValue.get() == null && r1 == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
+ Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+ computedValue.get(),
+ valueMatch,
+ versionMatch))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.newValue());
+ });
+ }
+
+ public CompletableFuture<PrepareResult> prepare(TransactionalMapUpdate<String, byte[]> update) {
+ return submit(new AtomixConsistentMapCommands.TransactionPrepare(update));
+ }
+
+ public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
+ return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
+ }
+
+ public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
+ return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
+ if (!mapEventListeners.isEmpty()) {
+ if (mapEventListeners.add(listener)) {
+ return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ mapEventListeners.add(listener);
+ return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
+ if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
+ return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void throwIfLocked(MapEntryUpdateResult.Status status) {
+ if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
+ throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
+ }
+ }
+
+ /**
+ * Change listener context.
+ */
+ private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
+ private final MapEventListener<String, byte[]> listener;
+
+ private ChangeListener(MapEventListener<String, byte[]> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void accept(MapEvent<String, byte[]> event) {
+ listener.event(event);
+ }
+
+ @Override
+ public void close() {
+ synchronized (AtomixConsistentMap.this) {
+ mapEventListeners.remove(listener);
+ if (mapEventListeners.isEmpty()) {
+ submit(new AtomixConsistentMapCommands.Unlisten());
+ }
+ }
+ }
+ }
+}
\ No newline at end of file