State machine implementations for various distributed primitives based on latest Copycat APIs
Change-Id: I622cc196aa1cdf072a5a0b100a5ffaaf71b07900
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
new file mode 100644
index 0000000..f8e111b
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.TypeSerializerFactory;
+import io.atomix.copycat.client.Query;
+import io.atomix.manager.state.GetResource;
+import io.atomix.manager.state.GetResourceKeys;
+import io.atomix.resource.ResourceQuery;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.Scanner;
+
+import org.onlab.util.Match;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
+import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
+import org.onosproject.store.primitives.resources.impl.MapUpdate;
+import org.onosproject.store.primitives.resources.impl.PrepareResult;
+import org.onosproject.store.primitives.resources.impl.RollbackResult;
+import org.onosproject.store.primitives.resources.impl.TransactionId;
+import org.onosproject.store.primitives.resources.impl.TransactionalMapUpdate;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+
+/**
+ * Serializer utility for Atomix Catalyst.
+ */
+public final class CatalystSerializers {
+
+ private CatalystSerializers() {
+ }
+
+ public static Serializer getSerializer() {
+ Serializer serializer = new Serializer();
+ TypeSerializerFactory factory =
+ new DefaultCatalystTypeSerializerFactory(
+ org.onosproject.store.service.Serializer.using(Arrays.asList((KryoNamespaces.API)),
+ MapEntryUpdateResult.class,
+ MapEntryUpdateResult.Status.class,
+ MapUpdate.class,
+ MapUpdate.Type.class,
+ TransactionalMapUpdate.class,
+ TransactionId.class,
+ PrepareResult.class,
+ CommitResult.class,
+ RollbackResult.class,
+ AtomixConsistentMapCommands.Get.class,
+ AtomixConsistentMapCommands.ContainsKey.class,
+ AtomixConsistentMapCommands.ContainsValue.class,
+ AtomixConsistentMapCommands.Size.class,
+ AtomixConsistentMapCommands.IsEmpty.class,
+ AtomixConsistentMapCommands.KeySet.class,
+ AtomixConsistentMapCommands.EntrySet.class,
+ AtomixConsistentMapCommands.Values.class,
+ AtomixConsistentMapCommands.UpdateAndGet.class,
+ AtomixConsistentMapCommands.TransactionPrepare.class,
+ AtomixConsistentMapCommands.TransactionCommit.class,
+ AtomixConsistentMapCommands.TransactionRollback.class,
+ AtomixLeaderElectorCommands.GetLeadership.class,
+ AtomixLeaderElectorCommands.GetAllLeaderships.class,
+ AtomixLeaderElectorCommands.GetElectedTopics.class,
+ AtomixLeaderElectorCommands.Run.class,
+ AtomixLeaderElectorCommands.Withdraw.class,
+ AtomixLeaderElectorCommands.Anoint.class,
+ GetResource.class,
+ GetResourceKeys.class,
+ ResourceQuery.class,
+ Query.ConsistencyLevel.class));
+ // ONOS classes
+ serializer.register(Change.class, factory);
+ serializer.register(NodeId.class, factory);
+ serializer.register(Match.class, factory);
+ serializer.register(MapEntryUpdateResult.class, factory);
+ serializer.register(MapEntryUpdateResult.Status.class, factory);
+ serializer.register(TransactionalMapUpdate.class, factory);
+ serializer.register(PrepareResult.class, factory);
+ serializer.register(CommitResult.class, factory);
+ serializer.register(RollbackResult.class, factory);
+ serializer.register(TransactionId.class, factory);
+ serializer.register(MapUpdate.class, factory);
+ serializer.register(Versioned.class, factory);
+ serializer.register(MapEvent.class, factory);
+ serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
+ serializer.register(AtomixConsistentMapState.class, factory);
+
+ serializer.register(ResourceQuery.class, factory);
+ serializer.register(GetResource.class, factory);
+ serializer.register(GetResourceKeys.class, factory);
+
+ // ConsistentMap
+ serializer.register(AtomixConsistentMapCommands.UpdateAndGet.class, factory);
+ serializer.register(AtomixConsistentMapCommands.Clear.class);
+ serializer.register(AtomixConsistentMapCommands.Listen.class);
+ serializer.register(AtomixConsistentMapCommands.Unlisten.class);
+ serializer.register(AtomixConsistentMapCommands.Get.class);
+ serializer.register(AtomixConsistentMapCommands.ContainsKey.class);
+ serializer.register(AtomixConsistentMapCommands.ContainsValue.class);
+ serializer.register(AtomixConsistentMapCommands.EntrySet.class);
+ serializer.register(AtomixConsistentMapCommands.IsEmpty.class);
+ serializer.register(AtomixConsistentMapCommands.KeySet.class);
+ serializer.register(AtomixConsistentMapCommands.Size.class);
+ serializer.register(AtomixConsistentMapCommands.Values.class);
+ serializer.register(AtomixConsistentMapCommands.TransactionPrepare.class);
+ serializer.register(AtomixConsistentMapCommands.TransactionCommit.class);
+ serializer.register(AtomixConsistentMapCommands.TransactionRollback.class);
+ // LeaderElector
+ serializer.register(AtomixLeaderElectorCommands.Run.class, factory);
+ serializer.register(AtomixLeaderElectorCommands.Withdraw.class, factory);
+ serializer.register(AtomixLeaderElectorCommands.Anoint.class, factory);
+ serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
+ serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
+ serializer.register(AtomixLeaderElectorCommands.GetLeadership.class, factory);
+ serializer.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, factory);
+ serializer.register(AtomixLeaderElectorCommands.Listen.class);
+ serializer.register(AtomixLeaderElectorCommands.Unlisten.class);
+ // Atomix types
+ try {
+ ClassLoader cl = CatalystSerializable.class.getClassLoader();
+ Enumeration<URL> urls = cl.getResources(
+ String.format("META-INF/services/%s", CatalystSerializable.class.getName()));
+ while (urls.hasMoreElements()) {
+ URL url = urls.nextElement();
+ try (Scanner scanner = new Scanner(url.openStream(), "UTF-8")) {
+ scanner.useDelimiter("\n").forEachRemaining(line -> {
+ if (!line.trim().startsWith("#")) {
+ line = line.trim();
+ if (line.length() > 0) {
+ try {
+ serializer.register(cl.loadClass(line));
+ } catch (ClassNotFoundException e) {
+ Throwables.propagate(e);
+ }
+ }
+ }
+ });
+ }
+ }
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ return serializer;
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
new file mode 100644
index 0000000..46487f2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.util.Listener;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.Consistency;
+import io.atomix.resource.Resource;
+import io.atomix.resource.ResourceTypeInfo;
+
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import org.onlab.util.Match;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed resource providing the {@link AsyncConsistentMap} primitive.
+ */
+@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
+public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
+ implements AsyncConsistentMap<String, byte[]> {
+
+ private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
+
+ private static final String CHANGE_SUBJECT = "change";
+
+ public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
+ super(client, options);
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<AtomixConsistentMap> open() {
+ return super.open().thenApply(result -> {
+ client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
+ return result;
+ });
+ }
+
+ private void handleEvent(MapEvent<String, byte[]> event) {
+ mapEventListeners.forEach(listener -> listener.event(event));
+ }
+
+ @Override
+ public AtomixConsistentMap with(Consistency consistency) {
+ super.with(consistency);
+ return this;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return submit(new AtomixConsistentMapCommands.IsEmpty());
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return submit(new AtomixConsistentMapCommands.Size());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(String key) {
+ return submit(new AtomixConsistentMapCommands.ContainsKey(key));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(byte[] value) {
+ return submit(new AtomixConsistentMapCommands.ContainsValue(value));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> get(String key) {
+ return submit(new AtomixConsistentMapCommands.Get(key));
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> keySet() {
+ return submit(new AtomixConsistentMapCommands.KeySet());
+ }
+
+ @Override
+ public CompletableFuture<Collection<Versioned<byte[]>>> values() {
+ return submit(new AtomixConsistentMapCommands.Values());
+ }
+
+ @Override
+ public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
+ return submit(new AtomixConsistentMapCommands.EntrySet());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.oldValue());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.newValue());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.oldValue());
+ }
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> remove(String key) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.oldValue());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Boolean> remove(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.updated());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Boolean> remove(String key, long version) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.updated());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.oldValue());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+ newValue,
+ Match.ifValue(oldValue),
+ Match.ANY))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.updated());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+ newValue,
+ Match.ANY,
+ Match.ifValue(oldVersion)))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.updated());
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return submit(new AtomixConsistentMapCommands.Clear())
+ .whenComplete((r, e) -> throwIfLocked(r))
+ .thenApply(v -> null);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public CompletableFuture<Versioned<byte[]>> computeIf(String key,
+ Predicate<? super byte[]> condition,
+ BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
+ return get(key).thenCompose(r1 -> {
+ byte[] existingValue = r1 == null ? null : r1.value();
+ // if the condition evaluates to false, return existing value.
+ if (!condition.test(existingValue)) {
+ return CompletableFuture.completedFuture(r1);
+ }
+
+ AtomicReference<byte[]> computedValue = new AtomicReference<>();
+ // if remappingFunction throws an exception, return the exception.
+ try {
+ computedValue.set(remappingFunction.apply(key, existingValue));
+ } catch (Exception e) {
+ CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ if (computedValue.get() == null && r1 == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
+ Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
+ return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+ computedValue.get(),
+ valueMatch,
+ versionMatch))
+ .whenComplete((r, e) -> throwIfLocked(r.status()))
+ .thenApply(v -> v.newValue());
+ });
+ }
+
+ public CompletableFuture<PrepareResult> prepare(TransactionalMapUpdate<String, byte[]> update) {
+ return submit(new AtomixConsistentMapCommands.TransactionPrepare(update));
+ }
+
+ public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
+ return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
+ }
+
+ public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
+ return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
+ if (!mapEventListeners.isEmpty()) {
+ if (mapEventListeners.add(listener)) {
+ return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ mapEventListeners.add(listener);
+ return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
+ if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
+ return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void throwIfLocked(MapEntryUpdateResult.Status status) {
+ if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
+ throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
+ }
+ }
+
+ /**
+ * Change listener context.
+ */
+ private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
+ private final MapEventListener<String, byte[]> listener;
+
+ private ChangeListener(MapEventListener<String, byte[]> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void accept(MapEvent<String, byte[]> event) {
+ listener.event(event);
+ }
+
+ @Override
+ public void close() {
+ synchronized (AtomixConsistentMap.this) {
+ mapEventListeners.remove(listener);
+ if (mapEventListeners.isEmpty()) {
+ submit(new AtomixConsistentMapCommands.Unlisten());
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
new file mode 100644
index 0000000..c463320
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -0,0 +1,515 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.copycat.client.Command;
+import io.atomix.copycat.client.Query;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.onlab.util.Match;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * {@link AtomixConsistentMap} resource state machine operations.
+ */
+public final class AtomixConsistentMapCommands {
+
+ private AtomixConsistentMapCommands() {
+ }
+
+ /**
+ * Abstract map command.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class MapCommand<V> implements Command<V>, CatalystSerializable {
+
+ @Override
+ public ConsistencyLevel consistency() {
+ return ConsistencyLevel.LINEARIZABLE;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+ }
+
+ /**
+ * Abstract map query.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class MapQuery<V> implements Query<V>, CatalystSerializable {
+
+ @Override
+ public ConsistencyLevel consistency() {
+ return ConsistencyLevel.BOUNDED_LINEARIZABLE;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+ }
+
+ /**
+ * Abstract key-based query.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class KeyQuery<V> extends MapQuery<V> {
+ protected String key;
+
+ public KeyQuery() {
+ }
+
+ public KeyQuery(String key) {
+ this.key = Assert.notNull(key, "key");
+ }
+
+ /**
+ * Returns the key.
+ * @return key
+ */
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("key", key)
+ .toString();
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ serializer.writeObject(key, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ key = serializer.readObject(buffer);
+ }
+ }
+
+ /**
+ * Abstract key-based query.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class ValueQuery<V> extends MapQuery<V> {
+ protected byte[] value;
+
+ public ValueQuery() {
+ }
+
+ public ValueQuery(byte[] value) {
+ this.value = Assert.notNull(value, "value");
+ }
+
+ /**
+ * Returns the key.
+ * @return key
+ */
+ public byte[] value() {
+ return value;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ serializer.writeObject(value, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ value = serializer.readObject(buffer);
+ }
+ }
+
+ /**
+ * Contains key command.
+ */
+ @SuppressWarnings("serial")
+ public static class ContainsKey extends KeyQuery<Boolean> {
+ public ContainsKey() {
+ }
+
+ public ContainsKey(String key) {
+ super(key);
+ }
+ }
+
+ /**
+ * Contains key command.
+ */
+ @SuppressWarnings("serial")
+ public static class ContainsValue extends ValueQuery<Boolean> {
+ public ContainsValue() {
+ }
+
+ public ContainsValue(byte[] value) {
+ super(value);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("value", value)
+ .toString();
+ }
+ }
+
+ /**
+ * Map prepare command.
+ */
+ @SuppressWarnings("serial")
+ public static class TransactionPrepare extends MapCommand<PrepareResult> {
+ private TransactionalMapUpdate<String, byte[]> update;
+
+ public TransactionPrepare() {
+ }
+
+ public TransactionPrepare(TransactionalMapUpdate<String, byte[]> update) {
+ this.update = update;
+ }
+
+ public TransactionalMapUpdate<String, byte[]> transactionUpdate() {
+ return update;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ serializer.writeObject(update, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ update = serializer.readObject(buffer);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("update", update)
+ .toString();
+ }
+ }
+
+ /**
+ * Map transaction commit command.
+ */
+ @SuppressWarnings("serial")
+ public static class TransactionCommit extends MapCommand<CommitResult> {
+ private TransactionId transactionId;
+
+ public TransactionCommit() {
+ }
+
+ public TransactionCommit(TransactionId transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ /**
+ * Returns the transaction identifier.
+ * @return transaction id
+ */
+ public TransactionId transactionId() {
+ return transactionId;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ serializer.writeObject(transactionId, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ transactionId = serializer.readObject(buffer);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("transactionId", transactionId)
+ .toString();
+ }
+ }
+
+ /**
+ * Map transaction rollback command.
+ */
+ @SuppressWarnings("serial")
+ public static class TransactionRollback extends MapCommand<RollbackResult> {
+ private TransactionId transactionId;
+
+ public TransactionRollback() {
+ }
+
+ public TransactionRollback(TransactionId transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ /**
+ * Returns the transaction identifier.
+ * @return transaction id
+ */
+ public TransactionId transactionId() {
+ return transactionId;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ serializer.writeObject(transactionId, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ transactionId = serializer.readObject(buffer);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("transactionId", transactionId)
+ .toString();
+ }
+ }
+
+ /**
+ * Map update command.
+ */
+ @SuppressWarnings("serial")
+ public static class UpdateAndGet extends MapCommand<MapEntryUpdateResult<String, byte[]>> {
+ private String key;
+ private byte[] value;
+ private Match<byte[]> valueMatch;
+ private Match<Long> versionMatch;
+
+ public UpdateAndGet() {
+ }
+
+ public UpdateAndGet(String key,
+ byte[] value,
+ Match<byte[]> valueMatch,
+ Match<Long> versionMatch) {
+ this.key = key;
+ this.value = value;
+ this.valueMatch = valueMatch;
+ this.versionMatch = versionMatch;
+ }
+
+ /**
+ * Returns the key.
+ * @return key
+ */
+ public String key() {
+ return this.key;
+ }
+
+ /**
+ * Returns the value.
+ * @return value
+ */
+ public byte[] value() {
+ return this.value;
+ }
+
+ /**
+ * Returns the value match.
+ * @return value match
+ */
+ public Match<byte[]> valueMatch() {
+ return this.valueMatch;
+ }
+
+ /**
+ * Returns the version match.
+ * @return version match
+ */
+ public Match<Long> versionMatch() {
+ return this.versionMatch;
+ }
+
+ @Override
+ public CompactionMode compaction() {
+ return value == null ? CompactionMode.FULL : CompactionMode.QUORUM;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ serializer.writeObject(key, buffer);
+ serializer.writeObject(value, buffer);
+ serializer.writeObject(valueMatch, buffer);
+ serializer.writeObject(versionMatch, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ key = serializer.readObject(buffer);
+ value = serializer.readObject(buffer);
+ valueMatch = serializer.readObject(buffer);
+ versionMatch = serializer.readObject(buffer);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("key", key)
+ .add("value", value)
+ .add("valueMatch", valueMatch)
+ .add("versionMatch", versionMatch)
+ .toString();
+ }
+ }
+
+ /**
+ * Get query.
+ */
+ @SuppressWarnings("serial")
+ public static class Get extends KeyQuery<Versioned<byte[]>> {
+ public Get() {
+ }
+
+ public Get(String key) {
+ super(key);
+ }
+ }
+
+ /**
+ * Is empty query.
+ */
+ @SuppressWarnings("serial")
+ public static class IsEmpty extends MapQuery<Boolean> {
+ }
+
+ /**
+ * KeySet query.
+ */
+ @SuppressWarnings("serial")
+ public static class KeySet extends MapQuery<Set<String>> {
+ }
+
+ /**
+ * KeySet query.
+ */
+ @SuppressWarnings("serial")
+ public static class Values extends MapQuery<Collection<Versioned<byte[]>>> {
+ }
+
+ /**
+ * KeySet query.
+ */
+ @SuppressWarnings("serial")
+ public static class EntrySet extends MapQuery<Set<Map.Entry<String, Versioned<byte[]>>>> {
+ }
+
+ /**
+ * Size query.
+ */
+ @SuppressWarnings("serial")
+ public static class Size extends MapQuery<Integer> {
+ }
+
+ /**
+ * Clear command.
+ */
+ @SuppressWarnings("serial")
+ public static class Clear extends MapCommand<MapEntryUpdateResult.Status> {
+
+ @Override
+ public CompactionMode compaction() {
+ return CompactionMode.FULL;
+ }
+ }
+
+ /**
+ * Change listen.
+ */
+ @SuppressWarnings("serial")
+ public static class Listen implements Command<Void>, CatalystSerializable {
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+ }
+
+ /**
+ * Change unlisten.
+ */
+ @SuppressWarnings("serial")
+ public static class Unlisten implements Command<Void>, CatalystSerializable {
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .toString();
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
new file mode 100644
index 0000000..100941f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -0,0 +1,626 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.onosproject.store.service.MapEvent.Type.INSERT;
+import static org.onosproject.store.service.MapEvent.Type.REMOVE;
+import static org.onosproject.store.service.MapEvent.Type.UPDATE;
+import io.atomix.copycat.client.session.Session;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.onlab.util.CountDownCompleter;
+import org.onlab.util.Match;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * State Machine for {@link AtomixConsistentMap} resource.
+ */
+public class AtomixConsistentMapState extends ResourceStateMachine implements
+ SessionListener, Snapshottable {
+ private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
+ private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
+ private final Set<String> preparedKeys = Sets.newHashSet();
+ private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
+ .newHashMap();
+ private AtomicLong versionCounter = new AtomicLong(0);
+
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeLong(versionCounter.get());
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ versionCounter = new AtomicLong(reader.readLong());
+ }
+
+ @Override
+ protected void configure(StateMachineExecutor executor) {
+ // Listeners
+ executor.register(AtomixConsistentMapCommands.Listen.class,
+ this::listen);
+ executor.register(AtomixConsistentMapCommands.Unlisten.class,
+ this::unlisten);
+ // Queries
+ executor.register(AtomixConsistentMapCommands.ContainsKey.class,
+ this::containsKey);
+ executor.register(AtomixConsistentMapCommands.ContainsValue.class,
+ this::containsValue);
+ executor.register(AtomixConsistentMapCommands.EntrySet.class,
+ this::entrySet);
+ executor.register(AtomixConsistentMapCommands.Get.class, this::get);
+ executor.register(AtomixConsistentMapCommands.IsEmpty.class,
+ this::isEmpty);
+ executor.register(AtomixConsistentMapCommands.KeySet.class,
+ this::keySet);
+ executor.register(AtomixConsistentMapCommands.Size.class, this::size);
+ executor.register(AtomixConsistentMapCommands.Values.class,
+ this::values);
+ // Commands
+ executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
+ this::updateAndGet);
+ executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
+ executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
+ this::prepare);
+ executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
+ this::commit);
+ executor.register(
+ AtomixConsistentMapCommands.TransactionRollback.class,
+ this::rollback);
+ }
+
+ @Override
+ public void delete() {
+ // Delete Listeners
+ listeners.values().forEach(Commit::close);
+ listeners.clear();
+
+ // Delete Map entries
+ mapEntries.values().forEach(MapEntryValue::discard);
+ mapEntries.clear();
+ }
+
+ /**
+ * Handles a contains key commit.
+ *
+ * @param commit
+ * containsKey commit
+ * @return {@code true} if map contains key
+ */
+ protected boolean containsKey(
+ Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
+ try {
+ return toVersioned(mapEntries.get(commit.operation().key())) != null;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a contains value commit.
+ *
+ * @param commit
+ * containsValue commit
+ * @return {@code true} if map contains value
+ */
+ protected boolean containsValue(
+ Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
+ try {
+ Match<byte[]> valueMatch = Match
+ .ifValue(commit.operation().value());
+ return mapEntries.values().stream()
+ .anyMatch(value -> valueMatch.matches(value.value()));
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a get commit.
+ *
+ * @param commit
+ * get commit
+ * @return value mapped to key
+ */
+ protected Versioned<byte[]> get(
+ Commit<? extends AtomixConsistentMapCommands.Get> commit) {
+ try {
+ return toVersioned(mapEntries.get(commit.operation().key()));
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a count commit.
+ *
+ * @param commit
+ * size commit
+ * @return number of entries in map
+ */
+ protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
+ try {
+ return mapEntries.size();
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles an is empty commit.
+ *
+ * @param commit
+ * isEmpty commit
+ * @return {@code true} if map is empty
+ */
+ protected boolean isEmpty(
+ Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
+ try {
+ return mapEntries.isEmpty();
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a keySet commit.
+ *
+ * @param commit
+ * keySet commit
+ * @return set of keys in map
+ */
+ protected Set<String> keySet(
+ Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
+ try {
+ return mapEntries.keySet();
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a values commit.
+ *
+ * @param commit
+ * values commit
+ * @return collection of values in map
+ */
+ protected Collection<Versioned<byte[]>> values(
+ Commit<? extends AtomixConsistentMapCommands.Values> commit) {
+ try {
+ return mapEntries.values().stream().map(this::toVersioned)
+ .collect(Collectors.toList());
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a entry set commit.
+ *
+ * @param commit
+ * entrySet commit
+ * @return set of map entries
+ */
+ protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
+ Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
+ try {
+ return mapEntries
+ .entrySet()
+ .stream()
+ .map(e -> Maps.immutableEntry(e.getKey(),
+ toVersioned(e.getValue())))
+ .collect(Collectors.toSet());
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a update and get commit.
+ *
+ * @param commit
+ * updateAndGet commit
+ * @return update result
+ */
+ protected MapEntryUpdateResult<String, byte[]> updateAndGet(
+ Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+ MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
+ String key = commit.operation().key();
+ MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
+ Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
+
+ if (updateStatus != MapEntryUpdateResult.Status.OK) {
+ commit.close();
+ return new MapEntryUpdateResult<>(updateStatus, "", key,
+ oldMapValue, oldMapValue);
+ }
+
+ byte[] newValue = commit.operation().value();
+ long newVersion = versionCounter.incrementAndGet();
+ Versioned<byte[]> newMapValue = newValue == null ? null
+ : new Versioned<>(newValue, newVersion);
+
+ MapEvent.Type updateType = newValue == null ? REMOVE
+ : oldCommitValue == null ? INSERT : UPDATE;
+ if (updateType == REMOVE || updateType == UPDATE) {
+ mapEntries.remove(key);
+ oldCommitValue.discard();
+ }
+ if (updateType == INSERT || updateType == UPDATE) {
+ mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
+ }
+ notify(new MapEvent<>("", key, newMapValue, oldMapValue));
+ return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
+ newMapValue);
+ }
+
+ /**
+ * Handles a clear commit.
+ *
+ * @param commit
+ * clear commit
+ * @return clear result
+ */
+ protected MapEntryUpdateResult.Status clear(
+ Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
+ try {
+ Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
+ .entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, MapEntryValue> entry = iterator.next();
+ String key = entry.getKey();
+ MapEntryValue value = entry.getValue();
+ Versioned<byte[]> removedValue = new Versioned<>(value.value(),
+ value.version());
+ notify(new MapEvent<>("", key, null, removedValue));
+ value.discard();
+ iterator.remove();
+ }
+ return MapEntryUpdateResult.Status.OK;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles a listen commit.
+ *
+ * @param commit
+ * listen commit
+ */
+ protected void listen(
+ Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
+ Long sessionId = commit.session().id();
+ listeners.put(sessionId, commit);
+ commit.session()
+ .onStateChange(
+ state -> {
+ if (state == Session.State.CLOSED
+ || state == Session.State.EXPIRED) {
+ Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
+ .remove(sessionId);
+ if (listener != null) {
+ listener.close();
+ }
+ }
+ });
+ }
+
+ /**
+ * Handles an unlisten commit.
+ *
+ * @param commit
+ * unlisten commit
+ */
+ protected void unlisten(
+ Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
+ try {
+ Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
+ .remove(commit.session());
+ if (listener != null) {
+ listener.close();
+ }
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Triggers a change event.
+ *
+ * @param value
+ * map event
+ */
+ private void notify(MapEvent<String, byte[]> value) {
+ listeners.values().forEach(
+ commit -> commit.session().publish("change", value));
+ }
+
+ /**
+ * Handles an prepare commit.
+ *
+ * @param commit
+ * transaction prepare commit
+ * @return prepare result
+ */
+ protected PrepareResult prepare(
+ Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
+ boolean ok = false;
+ try {
+ TransactionalMapUpdate<String, byte[]> transactionUpdate = commit
+ .operation().transactionUpdate();
+ for (MapUpdate<String, byte[]> update : transactionUpdate.batch()) {
+ String key = update.key();
+ if (preparedKeys.contains(key)) {
+ return PrepareResult.CONCURRENT_TRANSACTION;
+ }
+ MapEntryValue existingValue = mapEntries.get(key);
+ if (existingValue == null) {
+ if (update.currentValue() != null) {
+ return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+ }
+ } else {
+ if (existingValue.version() != update.currentVersion()) {
+ return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+ }
+ }
+ }
+ // No violations detected. Add to pendingTranctions and mark
+ // modified keys as
+ // currently locked to updates.
+ pendingTransactions.put(transactionUpdate.transactionId(), commit);
+ transactionUpdate.batch().forEach(u -> preparedKeys.add(u.key()));
+ ok = true;
+ return PrepareResult.OK;
+ } finally {
+ if (!ok) {
+ commit.close();
+ }
+ }
+ }
+
+ /**
+ * Handles an commit commit (ha!).
+ *
+ * @param commit transaction commit commit
+ * @return commit result
+ */
+ protected CommitResult commit(
+ Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
+ TransactionId transactionId = commit.operation().transactionId();
+ try {
+ Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
+ .remove(transactionId);
+ if (prepareCommit == null) {
+ return CommitResult.UNKNOWN_TRANSACTION_ID;
+ }
+ TransactionalMapUpdate<String, byte[]> transactionalUpdate = prepareCommit
+ .operation().transactionUpdate();
+ long totalReferencesToCommit = transactionalUpdate
+ .batch()
+ .stream()
+ .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .count();
+ CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
+ new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+ for (MapUpdate<String, byte[]> update : transactionalUpdate.batch()) {
+ String key = update.key();
+ MapEntryValue previousValue = mapEntries.remove(key);
+ MapEntryValue newValue = null;
+ checkState(preparedKeys.remove(key), "key is not prepared");
+ if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+ newValue = new TransactionalCommit(key,
+ versionCounter.incrementAndGet(), completer);
+ }
+ mapEntries.put(key, newValue);
+ // Notify map listeners
+ notify(new MapEvent<>("", key, toVersioned(newValue),
+ toVersioned(previousValue)));
+ }
+ return CommitResult.OK;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Handles an rollback commit (ha!).
+ *
+ * @param commit transaction rollback commit
+ * @return rollback result
+ */
+ protected RollbackResult rollback(
+ Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
+ TransactionId transactionId = commit.operation().transactionId();
+ try {
+ Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
+ .remove(transactionId);
+ if (prepareCommit == null) {
+ return RollbackResult.UNKNOWN_TRANSACTION_ID;
+ } else {
+ prepareCommit.operation().transactionUpdate().batch()
+ .forEach(u -> preparedKeys.remove(u.key()));
+ prepareCommit.close();
+ return RollbackResult.OK;
+ }
+ } finally {
+ commit.close();
+ }
+ }
+
+ private MapEntryUpdateResult.Status validate(
+ AtomixConsistentMapCommands.UpdateAndGet update) {
+ MapEntryValue existingValue = mapEntries.get(update.key());
+ if (existingValue == null && update.value() == null) {
+ return MapEntryUpdateResult.Status.NOOP;
+ }
+ if (preparedKeys.contains(update.key())) {
+ return MapEntryUpdateResult.Status.WRITE_LOCK;
+ }
+ byte[] existingRawValue = existingValue == null ? null : existingValue
+ .value();
+ Long existingVersion = existingValue == null ? null : existingValue
+ .version();
+ return update.valueMatch().matches(existingRawValue)
+ && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
+ : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
+ }
+
+ private Versioned<byte[]> toVersioned(MapEntryValue value) {
+ return value == null ? null : new Versioned<>(value.value(),
+ value.version());
+ }
+
+ @Override
+ public void register(Session session) {
+ }
+
+ @Override
+ public void unregister(Session session) {
+ closeListener(session.id());
+ }
+
+ @Override
+ public void expire(Session session) {
+ closeListener(session.id());
+ }
+
+ @Override
+ public void close(Session session) {
+ closeListener(session.id());
+ }
+
+ private void closeListener(Long sessionId) {
+ Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
+ .remove(sessionId);
+ if (commit != null) {
+ commit.close();
+ }
+ }
+
+ /**
+ * Interface implemented by map values.
+ */
+ private interface MapEntryValue {
+ /**
+ * Returns the raw {@code byte[]}.
+ *
+ * @return raw value
+ */
+ byte[] value();
+
+ /**
+ * Returns the version of the value.
+ *
+ * @return version
+ */
+ long version();
+
+ /**
+ * Discards the value by invoke appropriate clean up actions.
+ */
+ void discard();
+ }
+
+ /**
+ * A {@code MapEntryValue} that is derived from a non-transactional update
+ * i.e. via any standard map update operation.
+ */
+ private class NonTransactionalCommit implements MapEntryValue {
+ private final long version;
+ private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
+
+ public NonTransactionalCommit(
+ long version,
+ Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+ this.version = version;
+ this.commit = commit;
+ }
+
+ @Override
+ public byte[] value() {
+ return commit.operation().value();
+ }
+
+ @Override
+ public long version() {
+ return version;
+ }
+
+ @Override
+ public void discard() {
+ commit.close();
+ }
+ }
+
+ /**
+ * A {@code MapEntryValue} that is derived from updates submitted via a
+ * transaction.
+ */
+ private class TransactionalCommit implements MapEntryValue {
+ private final String key;
+ private final long version;
+ private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
+
+ public TransactionalCommit(
+ String key,
+ long version,
+ CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
+ this.key = key;
+ this.version = version;
+ this.completer = commit;
+ }
+
+ @Override
+ public byte[] value() {
+ TransactionalMapUpdate<String, byte[]> update = completer.object()
+ .operation().transactionUpdate();
+ return update.valueForKey(key);
+ }
+
+ @Override
+ public long version() {
+ return version;
+ }
+
+ @Override
+ public void discard() {
+ completer.countDown();
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixCounter.java
new file mode 100644
index 0000000..34ae507
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixCounter.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.variables.DistributedLong;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.service.AsyncAtomicCounter;
+
+/**
+ * {@code AsyncAtomicCounter} implementation backed by Atomix
+ * {@link DistributedLong}.
+ */
+public class AtomixCounter implements AsyncAtomicCounter {
+
+ private final String name;
+ private final DistributedLong distLong;
+
+ public AtomixCounter(String name, DistributedLong distLong) {
+ this.name = name;
+ this.distLong = distLong;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public CompletableFuture<Long> incrementAndGet() {
+ return distLong.incrementAndGet();
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndIncrement() {
+ return distLong.getAndIncrement();
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndAdd(long delta) {
+ return distLong.getAndAdd(delta);
+ }
+
+ @Override
+ public CompletableFuture<Long> addAndGet(long delta) {
+ return distLong.addAndGet(delta);
+ }
+
+ @Override
+ public CompletableFuture<Long> get() {
+ return distLong.get();
+ }
+
+ @Override
+ public CompletableFuture<Void> set(long value) {
+ return distLong.set(value);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> compareAndSet(long expectedValue,
+ long updateValue) {
+ return distLong.compareAndSet(expectedValue, updateValue);
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
new file mode 100644
index 0000000..b18d3da
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.util.Listener;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.Consistency;
+import io.atomix.resource.Resource;
+import io.atomix.resource.ResourceTypeInfo;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.service.AsyncLeaderElector;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed resource providing the {@link AsyncLeaderElector} primitive.
+ */
+@ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
+public class AtomixLeaderElector
+ extends Resource<AtomixLeaderElector, Resource.Options> implements AsyncLeaderElector {
+ private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
+ Sets.newConcurrentHashSet();
+
+ private Listener<Change<Leadership>> listener;
+
+ public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
+ super(client, options);
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<AtomixLeaderElector> open() {
+ return super.open().thenApply(result -> {
+ client.session().onEvent("change", this::handleEvent);
+ return result;
+ });
+ }
+
+ private void handleEvent(Change<Leadership> change) {
+ leadershipChangeListeners.forEach(l -> l.accept(change));
+ }
+
+ @Override
+ public AtomixLeaderElector with(Consistency consistency) {
+ super.with(consistency);
+ return this;
+ }
+
+ @Override
+ public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
+ return submit(new AtomixLeaderElectorCommands.Run(topic, nodeId));
+ }
+
+ @Override
+ public CompletableFuture<Void> withdraw(String topic) {
+ return submit(new AtomixLeaderElectorCommands.Withdraw(topic));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
+ return submit(new AtomixLeaderElectorCommands.Anoint(topic, nodeId));
+ }
+
+ @Override
+ public CompletableFuture<Leadership> getLeadership(String topic) {
+ return submit(new AtomixLeaderElectorCommands.GetLeadership(topic));
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Leadership>> getLeaderships() {
+ return submit(new AtomixLeaderElectorCommands.GetAllLeaderships());
+ }
+
+ public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
+ return submit(new AtomixLeaderElectorCommands.GetElectedTopics(nodeId));
+ }
+
+ /**
+ * Leadership change listener context.
+ */
+ private final class LeadershipChangeListener implements Listener<Change<Leadership>> {
+ private final Consumer<Change<Leadership>> listener;
+
+ private LeadershipChangeListener(Consumer<Change<Leadership>> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void accept(Change<Leadership> change) {
+ listener.accept(change);
+ }
+
+ @Override
+ public void close() {
+ synchronized (AtomixLeaderElector.this) {
+ submit(new AtomixLeaderElectorCommands.Unlisten());
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
+ leadershipChangeListeners.add(consumer);
+ return setupListener();
+ }
+
+ @Override
+ public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
+ leadershipChangeListeners.remove(consumer);
+ return teardownListener();
+ }
+
+ private CompletableFuture<Void> setupListener() {
+ if (listener == null && !leadershipChangeListeners.isEmpty()) {
+ Consumer<Change<Leadership>> changeConsumer = change -> {
+ leadershipChangeListeners.forEach(consumer -> consumer.accept(change));
+ };
+ return submit(new AtomixLeaderElectorCommands.Listen())
+ .thenAccept(v -> listener = new LeadershipChangeListener(changeConsumer));
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private CompletableFuture<Void> teardownListener() {
+ if (listener != null && leadershipChangeListeners.isEmpty()) {
+ listener.close();
+ listener = null;
+ return submit(new AtomixLeaderElectorCommands.Unlisten());
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
new file mode 100644
index 0000000..16f1769
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Strings;
+
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.copycat.client.Command;
+import io.atomix.copycat.client.Query;
+
+/**
+ * {@link AtomixLeaderElector} resource state machine operations.
+ */
+public final class AtomixLeaderElectorCommands {
+
+ private AtomixLeaderElectorCommands() {
+ }
+
+ /**
+ * Abstract election query.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class ElectionQuery<V> implements Query<V>, CatalystSerializable {
+
+ @Override
+ public ConsistencyLevel consistency() {
+ return ConsistencyLevel.BOUNDED_LINEARIZABLE;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+ }
+
+ /**
+ * Abstract election topic query.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class TopicQuery<V> extends ElectionQuery<V> implements CatalystSerializable {
+ String topic;
+
+ public TopicQuery() {
+ }
+
+ public TopicQuery(String topic) {
+ this.topic = Assert.notNull(topic, "topic");
+ }
+
+ /**
+ * Returns the topic.
+ * @return topic
+ */
+ public String topic() {
+ return topic;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ serializer.writeObject(topic, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ topic = serializer.readObject(buffer);
+ }
+ }
+
+ /**
+ * Abstract election command.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class ElectionCommand<V> implements Command<V>, CatalystSerializable {
+
+ @Override
+ public ConsistencyLevel consistency() {
+ return ConsistencyLevel.LINEARIZABLE;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+ }
+
+ /**
+ * Listen command.
+ */
+ @SuppressWarnings("serial")
+ public static class Listen extends ElectionCommand<Void> {
+ }
+
+ /**
+ * Unlisten command.
+ */
+ @SuppressWarnings("serial")
+ public static class Unlisten extends ElectionCommand<Void> {
+
+ @Override
+ public CompactionMode compaction() {
+ return CompactionMode.QUORUM;
+ }
+ }
+
+ /**
+ * GetLeader query.
+ */
+ @SuppressWarnings("serial")
+ public static class GetLeadership extends TopicQuery<Leadership> {
+
+ public GetLeadership() {
+ }
+
+ public GetLeadership(String topic) {
+ super(topic);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("topic", topic)
+ .toString();
+ }
+ }
+
+ /**
+ * GetAllLeaders query.
+ */
+ @SuppressWarnings("serial")
+ public static class GetAllLeaderships extends ElectionQuery<Map<String, Leadership>> {
+ }
+
+ /**
+ * GetElectedTopics query.
+ */
+ @SuppressWarnings("serial")
+ public static class GetElectedTopics extends ElectionQuery<Set<String>> {
+ private NodeId nodeId;
+
+ public GetElectedTopics() {
+ }
+
+ public GetElectedTopics(NodeId nodeId) {
+ this.nodeId = Assert.argNot(nodeId, nodeId == null, "nodeId cannot be null");
+ }
+
+ /**
+ * Returns the nodeId to check.
+ *
+ * @return The nodeId to check.
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("nodeId", nodeId)
+ .toString();
+ }
+ }
+
+ /**
+ * Enter and run for leadership.
+ */
+ @SuppressWarnings("serial")
+ public static class Run extends ElectionCommand<Leadership> {
+ private String topic;
+ private NodeId nodeId;
+
+ public Run() {
+ }
+
+ public Run(String topic, NodeId nodeId) {
+ this.topic = Assert.argNot(topic, Strings.isNullOrEmpty(topic), "topic cannot be null or empty");
+ this.nodeId = Assert.argNot(nodeId, nodeId == null, "nodeId cannot be null");
+ }
+
+ /**
+ * Returns the topic.
+ *
+ * @return topic
+ */
+ public String topic() {
+ return topic;
+ }
+
+ /**
+ * Returns the nodeId.
+ *
+ * @return the nodeId
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("topic", topic)
+ .add("nodeId", nodeId)
+ .toString();
+ }
+ }
+
+ /**
+ * Withdraw from a leadership contest.
+ */
+ @SuppressWarnings("serial")
+ public static class Withdraw extends ElectionCommand<Void> {
+ private String topic;
+
+ public Withdraw() {
+ }
+
+ public Withdraw(String topic) {
+ this.topic = Assert.argNot(topic, Strings.isNullOrEmpty(topic), "topic cannot be null or empty");
+ }
+
+ /**
+ * Returns the topic.
+ *
+ * @return The topic
+ */
+ public String topic() {
+ return topic;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("topic", topic)
+ .toString();
+ }
+ }
+
+ /**
+ * Command for administratively anointing a node as leader.
+ */
+ @SuppressWarnings("serial")
+ public static class Anoint extends ElectionCommand<Boolean> {
+ private String topic;
+ private NodeId nodeId;
+
+ public Anoint() {
+ }
+
+ public Anoint(String topic, NodeId nodeId) {
+ this.topic = topic;
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * Returns the topic.
+ *
+ * @return The topic
+ */
+ public String topic() {
+ return topic;
+ }
+
+ /**
+ * Returns the nodeId to make leader.
+ *
+ * @return The nodeId
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("topic", topic)
+ .add("nodeId", nodeId)
+ .toString();
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
new file mode 100644
index 0000000..2ae6e68
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -0,0 +1,445 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.client.session.Session;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * State machine for {@link AtomixLeaderElector} resource.
+ */
+public class AtomixLeaderElectorState extends ResourceStateMachine
+ implements SessionListener, Snapshottable {
+
+ private final Logger log = getLogger(getClass());
+ private Map<String, AtomicLong> termCounters = new HashMap<>();
+ private Map<String, ElectionState> elections = new HashMap<>();
+ private final Map<Long, Commit<? extends AtomixLeaderElectorCommands.Listen>> listeners = new LinkedHashMap<>();
+ private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
+ ElectionState.class,
+ Registration.class);
+
+ @Override
+ protected void configure(StateMachineExecutor executor) {
+ // Notification
+ executor.register(AtomixLeaderElectorCommands.Listen.class, this::listen);
+ executor.register(AtomixLeaderElectorCommands.Unlisten.class, this::unlisten);
+ // Commands
+ executor.register(AtomixLeaderElectorCommands.Run.class, this::run);
+ executor.register(AtomixLeaderElectorCommands.Withdraw.class, this::withdraw);
+ executor.register(AtomixLeaderElectorCommands.Anoint.class, this::anoint);
+ // Queries
+ executor.register(AtomixLeaderElectorCommands.GetLeadership.class, this::leadership);
+ executor.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, this::allLeaderships);
+ executor.register(AtomixLeaderElectorCommands.GetElectedTopics.class, this::electedTopics);
+ }
+
+ private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
+ Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
+ listeners.values().forEach(listener -> listener.session().publish("change", change));
+ }
+
+ @Override
+ public void delete() {
+ // Close and clear Listeners
+ listeners.values().forEach(Commit::close);
+ listeners.clear();
+ }
+
+ /**
+ * Applies listen commits.
+ *
+ * @param commit listen commit
+ */
+ public void listen(Commit<? extends AtomixLeaderElectorCommands.Listen> commit) {
+ if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
+ commit.close();
+ }
+ }
+
+ /**
+ * Applies unlisten commits.
+ *
+ * @param commit unlisten commit
+ */
+ public void unlisten(Commit<? extends AtomixLeaderElectorCommands.Unlisten> commit) {
+ try {
+ Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(commit.session().id());
+ if (listener != null) {
+ listener.close();
+ }
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
+ * @param commit commit entry
+ * @return topic leader. If no previous leader existed this is the node that just entered the race.
+ */
+ public Leadership run(Commit<? extends AtomixLeaderElectorCommands.Run> commit) {
+ try {
+ String topic = commit.operation().topic();
+ Leadership oldLeadership = leadership(topic);
+ Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
+ elections.compute(topic, (k, v) -> {
+ if (v == null) {
+ return new ElectionState(registration, termCounter(topic)::incrementAndGet);
+ } else {
+ if (!v.isDuplicate(registration)) {
+ return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
+ } else {
+ return v;
+ }
+ }
+ });
+ Leadership newLeadership = leadership(topic);
+
+ if (!Objects.equal(oldLeadership, newLeadership)) {
+ notifyLeadershipChange(oldLeadership, newLeadership);
+ }
+ return newLeadership;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
+ * @param commit withdraw commit
+ */
+ public void withdraw(Commit<? extends AtomixLeaderElectorCommands.Withdraw> commit) {
+ try {
+ String topic = commit.operation().topic();
+ Leadership oldLeadership = leadership(topic);
+ elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
+ termCounter(topic)::incrementAndGet));
+ Leadership newLeadership = leadership(topic);
+ if (!Objects.equal(oldLeadership, newLeadership)) {
+ notifyLeadershipChange(oldLeadership, newLeadership);
+ }
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
+ * @param commit anoint commit
+ * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
+ */
+ public boolean anoint(Commit<? extends AtomixLeaderElectorCommands.Anoint> commit) {
+ try {
+ String topic = commit.operation().topic();
+ Leadership oldLeadership = leadership(topic);
+ ElectionState electionState = elections.computeIfPresent(topic,
+ (k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic)));
+ Leadership newLeadership = leadership(topic);
+ if (!Objects.equal(oldLeadership, newLeadership)) {
+ notifyLeadershipChange(oldLeadership, newLeadership);
+ }
+ return (electionState != null &&
+ electionState.leader() != null &&
+ commit.operation().nodeId().equals(electionState.leader().nodeId()));
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
+ * @param commit GetLeadership commit
+ * @return leader
+ */
+ public Leadership leadership(Commit<? extends AtomixLeaderElectorCommands.GetLeadership> commit) {
+ String topic = commit.operation().topic();
+ try {
+ return leadership(topic);
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
+ * @param commit commit entry
+ * @return set of topics for which the node is the leader
+ */
+ public Set<String> electedTopics(Commit<? extends AtomixLeaderElectorCommands.GetElectedTopics> commit) {
+ try {
+ NodeId nodeId = commit.operation().nodeId();
+ return Maps.filterEntries(elections, e -> {
+ Leader leader = leadership(e.getKey()).leader();
+ return leader != null && leader.nodeId().equals(nodeId);
+ }).keySet();
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
+ * @param commit GetAllLeaderships commit
+ * @return topic to leader mapping
+ */
+ public Map<String, Leadership> allLeaderships(
+ Commit<? extends AtomixLeaderElectorCommands.GetAllLeaderships> commit) {
+ try {
+ return Maps.transformEntries(elections, (k, v) -> leadership(k));
+ } finally {
+ commit.close();
+ }
+ }
+
+ private Leadership leadership(String topic) {
+ return new Leadership(topic,
+ leader(topic),
+ candidates(topic));
+ }
+
+ private Leader leader(String topic) {
+ ElectionState electionState = elections.get(topic);
+ return electionState == null ? null : electionState.leader();
+ }
+
+ private List<NodeId> candidates(String topic) {
+ ElectionState electionState = elections.get(topic);
+ return electionState == null ? new LinkedList<>() : electionState.candidates();
+ }
+
+ private void onSessionEnd(Session session) {
+ Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
+ if (listener != null) {
+ listener.close();
+ }
+ Set<String> topics = elections.keySet();
+ topics.forEach(topic -> {
+ Leadership oldLeadership = leadership(topic);
+ elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
+ Leadership newLeadership = leadership(topic);
+ if (!Objects.equal(oldLeadership, newLeadership)) {
+ notifyLeadershipChange(oldLeadership, newLeadership);
+ }
+ });
+ }
+
+ private static class Registration {
+ private final NodeId nodeId;
+ private final long sessionId;
+
+ public Registration(NodeId nodeId, long sessionId) {
+ this.nodeId = nodeId;
+ this.sessionId = sessionId;
+ }
+
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ public long sessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("nodeId", nodeId)
+ .add("sessionId", sessionId)
+ .toString();
+ }
+ }
+
+ private static class ElectionState {
+ final Registration leader;
+ final long term;
+ final long termStartTime;
+ final List<Registration> registrations;
+
+ public ElectionState(Registration registration, Supplier<Long> termCounter) {
+ registrations = Arrays.asList(registration);
+ term = termCounter.get();
+ termStartTime = System.currentTimeMillis();
+ leader = registration;
+ }
+
+ public ElectionState(ElectionState other) {
+ registrations = Lists.newArrayList(other.registrations);
+ leader = other.leader;
+ term = other.term;
+ termStartTime = other.termStartTime;
+ }
+
+ public ElectionState(List<Registration> registrations,
+ Registration leader,
+ long term,
+ long termStartTime) {
+ this.registrations = Lists.newArrayList(registrations);
+ this.leader = leader;
+ this.term = term;
+ this.termStartTime = termStartTime;
+ }
+
+ public ElectionState cleanup(Session session, Supplier<Long> termCounter) {
+ Optional<Registration> registration =
+ registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
+ if (registration.isPresent()) {
+ List<Registration> updatedRegistrations =
+ registrations.stream()
+ .filter(r -> r.sessionId() != session.id())
+ .collect(Collectors.toList());
+ if (leader.sessionId() == session.id()) {
+ if (updatedRegistrations.size() > 0) {
+ return new ElectionState(updatedRegistrations,
+ updatedRegistrations.get(0),
+ termCounter.get(),
+ System.currentTimeMillis());
+ } else {
+ return new ElectionState(updatedRegistrations, null, term, termStartTime);
+ }
+ } else {
+ return new ElectionState(updatedRegistrations, leader, term, termStartTime);
+ }
+ } else {
+ return this;
+ }
+ }
+
+ public boolean isDuplicate(Registration registration) {
+ return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
+ }
+
+ public Leader leader() {
+ if (leader == null) {
+ return null;
+ } else {
+ NodeId leaderNodeId = leader.nodeId();
+ return new Leader(leaderNodeId, term, termStartTime);
+ }
+ }
+
+ public List<NodeId> candidates() {
+ return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
+ }
+
+ public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
+ if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
+ List<Registration> updatedRegistrations = new LinkedList<>(registrations);
+ updatedRegistrations.add(registration);
+ boolean newLeader = leader == null;
+ return new ElectionState(updatedRegistrations,
+ newLeader ? registration : leader,
+ newLeader ? termCounter.get() : term,
+ newLeader ? System.currentTimeMillis() : termStartTime);
+ }
+ return this;
+ }
+
+ public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
+ Registration newLeader = registrations.stream()
+ .filter(r -> r.nodeId().equals(nodeId))
+ .findFirst()
+ .orElse(null);
+ if (newLeader != null) {
+ return new ElectionState(registrations,
+ newLeader,
+ termCounter.incrementAndGet(),
+ System.currentTimeMillis());
+ } else {
+ return this;
+ }
+ }
+ }
+
+ @Override
+ public void register(Session session) {
+ }
+
+ @Override
+ public void unregister(Session session) {
+ onSessionEnd(session);
+ }
+
+ @Override
+ public void expire(Session session) {
+ onSessionEnd(session);
+ }
+
+ @Override
+ public void close(Session session) {
+ onSessionEnd(session);
+ }
+
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ byte[] encodedTermCounters = serializer.encode(termCounters);
+ writer.writeInt(encodedTermCounters.length);
+ writer.write(encodedTermCounters);
+ byte[] encodedElections = serializer.encode(elections);
+ writer.writeInt(encodedElections.length);
+ writer.write(encodedElections);
+ log.info("Took state machine snapshot");
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ int encodedTermCountersSize = reader.readInt();
+ byte[] encodedTermCounters = new byte[encodedTermCountersSize];
+ reader.read(encodedTermCounters);
+ termCounters = serializer.decode(encodedTermCounters);
+ int encodedElectionsSize = reader.readInt();
+ byte[] encodedElections = new byte[encodedElectionsSize];
+ reader.read(encodedElections);
+ elections = serializer.decode(encodedElections);
+ log.info("Reinstated state machine from snapshot");
+ }
+
+ private AtomicLong termCounter(String topic) {
+ return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixValue.java
new file mode 100644
index 0000000..155fe01
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixValue.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AtomicValueEvent;
+import org.onosproject.store.service.AtomicValueEventListener;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@link AsyncAtomicValue} backed by {@link AtomixConsistentMap}.
+ */
+public class AtomixValue implements AsyncAtomicValue<String> {
+
+ private final String name;
+ private final AtomixConsistentMap atomixMap;
+ private MapEventListener<String, byte[]> mapEventListener;
+ private final Set<AtomicValueEventListener<String>> listeners = Sets.newIdentityHashSet();
+
+ AtomixValue(String name, AtomixConsistentMap atomixMap) {
+ this.name = name;
+ this.atomixMap = atomixMap;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> compareAndSet(String expect, String update) {
+ return atomixMap.replace(name, Tools.getBytesUtf8(expect), Tools.getBytesUtf8(update));
+ }
+
+ @Override
+ public CompletableFuture<String> get() {
+ return atomixMap.get(name)
+ .thenApply(v -> v != null ? Tools.toStringUtf8(v.value()) : null);
+ }
+
+ @Override
+ public CompletableFuture<String> getAndSet(String value) {
+ return atomixMap.put(name, Tools.getBytesUtf8(value))
+ .thenApply(v -> v != null ? Tools.toStringUtf8(v.value()) : null);
+ }
+
+ @Override
+ public CompletableFuture<Void> set(String value) {
+ return getAndSet(value).thenApply(v -> null);
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(AtomicValueEventListener<String> listener) {
+ // TODO: synchronization
+ if (mapEventListener == null) {
+ mapEventListener = event -> {
+ Versioned<byte[]> newValue = event.newValue();
+ Versioned<byte[]> oldValue = event.oldValue();
+ if (Objects.equals(event.key(), name)) {
+ listener.event(new AtomicValueEvent<>(name,
+ newValue == null ? null : Tools.toStringUtf8(newValue.value()),
+ oldValue == null ? null : Tools.toStringUtf8(oldValue.value())));
+ }
+ };
+ return atomixMap.addListener(mapEventListener).whenComplete((r, e) -> {
+ if (e == null) {
+ listeners.add(listener);
+ } else {
+ mapEventListener = null;
+ }
+ });
+ } else {
+ listeners.add(listener);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(AtomicValueEventListener<String> listener) {
+ // TODO: synchronization
+ listeners.remove(listener);
+ if (listeners.isEmpty()) {
+ return atomixMap.removeListener(mapEventListener);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java
new file mode 100644
index 0000000..2b43f83
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+/**
+ * Response enum for two phase commit operation.
+ */
+public enum CommitResult {
+ /**
+ * Signifies a successful commit execution.
+ */
+ OK,
+
+ /**
+ * Signifies a failure due to unrecognized transaction identifier.
+ */
+ UNKNOWN_TRANSACTION_ID,
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
new file mode 100644
index 0000000..7858595
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
@@ -0,0 +1,157 @@
+
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.function.Function;
+
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Result of a map entry update operation.
+ * <p>
+ * Both old and new values are accessible along with a flag that indicates if the
+ * the value was updated. If flag is false, oldValue and newValue both
+ * point to the same unmodified value.
+ * @param <V> result type
+ */
+public class MapEntryUpdateResult<K, V> {
+
+ public enum Status {
+
+ /**
+ * Indicates a successful update.
+ */
+ OK,
+
+ /**
+ * Indicates a noop i.e. existing and new value are both null.
+ */
+ NOOP,
+
+ /**
+ * Indicates a failed update due to a write lock.
+ */
+ WRITE_LOCK,
+
+ /**
+ * Indicates a failed update due to a precondition check failure.
+ */
+ PRECONDITION_FAILED
+ }
+
+ private final String mapName;
+ private Status status;
+ private final K key;
+ private final Versioned<V> oldValue;
+ private final Versioned<V> newValue;
+
+ public MapEntryUpdateResult(Status status, String mapName, K key, Versioned<V> oldValue, Versioned<V> newValue) {
+ this.status = status;
+ this.mapName = mapName;
+ this.key = key;
+ this.oldValue = oldValue;
+ this.newValue = newValue;
+ }
+
+ /**
+ * Returns {@code true} if the update was successful.
+ * @return {@code true} if yes, {@code false} otherwise
+ */
+ public boolean updated() {
+ return status == Status.OK;
+ }
+
+ /**
+ * Returns the map name.
+ * @return map name
+ */
+ public String mapName() {
+ return mapName;
+ }
+
+ /**
+ * Returns the update status.
+ * @return update status
+ */
+ public Status status() {
+ return status;
+ }
+
+ /**
+ * Returns the map key.
+ * @return key
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Returns the old value.
+ * @return the previous value associated with key if updated was successful, otherwise current value
+ */
+ public Versioned<V> oldValue() {
+ return oldValue;
+ }
+
+ /**
+ * Returns the new value after update.
+ * @return if updated was unsuccessful, this is same as old value
+ */
+ public Versioned<V> newValue() {
+ return newValue;
+ }
+
+ /**
+ * Maps to another instance with different key and value types.
+ * @param keyTransform transformer to use for transcoding keys
+ * @param valueMapper mapper to use for transcoding values
+ * @return new instance
+ */
+ public <K1, V1> MapEntryUpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
+ return new MapEntryUpdateResult<>(status,
+ mapName,
+ keyTransform.apply(key),
+ oldValue == null ? null : oldValue.map(valueMapper),
+ newValue == null ? null : newValue.map(valueMapper));
+ }
+
+ /**
+ * Return the map event that will be generated as a result of this update.
+ * @return map event. if update was unsuccessful, this returns {@code null}
+ */
+ public MapEvent<K, V> toMapEvent() {
+ if (!updated()) {
+ return null;
+ } else {
+ return new MapEvent<>(mapName(), key(), newValue, oldValue);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(MapEntryUpdateResult.class)
+ .add("mapName", mapName)
+ .add("status", status)
+ .add("key", key)
+ .add("oldValue", oldValue)
+ .add("newValue", newValue)
+ .toString();
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
new file mode 100644
index 0000000..c105a08
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Map update operation.
+ *
+ * @param <K> map key type
+ * @param <V> map value type
+ *
+ */
+public final class MapUpdate<K, V> {
+
+ /**
+ * Type of database update operation.
+ */
+ public enum Type {
+ /**
+ * Insert/Update entry without any checks.
+ */
+ PUT,
+ /**
+ * Insert an entry iff there is no existing entry for that key.
+ */
+ PUT_IF_ABSENT,
+
+ /**
+ * Update entry if the current version matches specified version.
+ */
+ PUT_IF_VERSION_MATCH,
+
+ /**
+ * Update entry if the current value matches specified value.
+ */
+ PUT_IF_VALUE_MATCH,
+
+ /**
+ * Remove entry without any checks.
+ */
+ REMOVE,
+
+ /**
+ * Remove entry if the current version matches specified version.
+ */
+ REMOVE_IF_VERSION_MATCH,
+
+ /**
+ * Remove entry if the current value matches specified value.
+ */
+ REMOVE_IF_VALUE_MATCH,
+ }
+
+ private Type type;
+ private K key;
+ private V value;
+ private V currentValue;
+ private long currentVersion = -1;
+
+ /**
+ * Returns the type of update operation.
+ * @return type of update.
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the item key being updated.
+ * @return item key
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Returns the new value.
+ * @return item's target value.
+ */
+ public V value() {
+ return value;
+ }
+
+ /**
+ * Returns the expected current value for the key.
+ * @return current value in database.
+ */
+ public V currentValue() {
+ return currentValue;
+ }
+
+ /**
+ * Returns the expected current version in the database for the key.
+ * @return expected version.
+ */
+ public long currentVersion() {
+ return currentVersion;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("key", key)
+ .add("value", value)
+ .add("currentValue", currentValue)
+ .add("currentVersion", currentVersion)
+ .toString();
+ }
+
+ /**
+ * Creates a new builder instance.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ * @return builder.
+ */
+ public static <K, V> Builder<K, V> newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * MapUpdate builder.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+ public static final class Builder<K, V> {
+
+ private MapUpdate<K, V> update = new MapUpdate<>();
+
+ public MapUpdate<K, V> build() {
+ validateInputs();
+ return update;
+ }
+
+ public Builder<K, V> withType(Type type) {
+ update.type = checkNotNull(type, "type cannot be null");
+ return this;
+ }
+
+ public Builder<K, V> withKey(K key) {
+ update.key = checkNotNull(key, "key cannot be null");
+ return this;
+ }
+
+ public Builder<K, V> withCurrentValue(V value) {
+ update.currentValue = checkNotNull(value, "currentValue cannot be null");
+ return this;
+ }
+
+ public Builder<K, V> withValue(V value) {
+ update.value = checkNotNull(value, "value cannot be null");
+ return this;
+ }
+
+ public Builder<K, V> withCurrentVersion(long version) {
+ checkArgument(version >= 0, "version cannot be negative");
+ update.currentVersion = version;
+ return this;
+ }
+
+ private void validateInputs() {
+ checkNotNull(update.type, "type must be specified");
+ checkNotNull(update.key, "key must be specified");
+ switch (update.type) {
+ case PUT:
+ case PUT_IF_ABSENT:
+ checkNotNull(update.value, "value must be specified.");
+ break;
+ case PUT_IF_VERSION_MATCH:
+ checkNotNull(update.value, "value must be specified.");
+ checkState(update.currentVersion >= 0, "current version must be specified");
+ break;
+ case PUT_IF_VALUE_MATCH:
+ checkNotNull(update.value, "value must be specified.");
+ checkNotNull(update.currentValue, "currentValue must be specified.");
+ break;
+ case REMOVE:
+ break;
+ case REMOVE_IF_VERSION_MATCH:
+ checkState(update.currentVersion >= 0, "current version must be specified");
+ break;
+ case REMOVE_IF_VALUE_MATCH:
+ checkNotNull(update.currentValue, "currentValue must be specified.");
+ break;
+ default:
+ throw new IllegalStateException("Unknown operation type");
+ }
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java
new file mode 100644
index 0000000..0465743
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+/**
+ * Response enum for two phase commit prepare operation.
+ */
+public enum PrepareResult {
+ /**
+ * Signifies a successful execution of the prepare operation.
+ */
+ OK,
+
+ /**
+ * Signifies a failure to another transaction locking the underlying state.
+ */
+ CONCURRENT_TRANSACTION,
+
+ /**
+ * Signifies a optimistic lock failure. This can happen if underlying state has changed since it was last read.
+ */
+ OPTIMISTIC_LOCK_FAILURE,
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/RollbackResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/RollbackResult.java
new file mode 100644
index 0000000..6f0790f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/RollbackResult.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+/**
+ * Response enum for two phase commit rollback operation.
+ */
+public enum RollbackResult {
+ /**
+ * Signifies a successful rollback execution.
+ */
+ OK,
+
+ /**
+ * Signifies a failure due to unrecognized transaction identifier.
+ */
+ UNKNOWN_TRANSACTION_ID,
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionId.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionId.java
new file mode 100644
index 0000000..e378580
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionId.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import com.google.common.base.Objects;
+
+/**
+ * Transaction identifier.
+ */
+public final class TransactionId {
+
+ public static TransactionId from(String id) {
+ return new TransactionId(id);
+ }
+
+ private final String id;
+
+ private TransactionId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other instanceof TransactionId) {
+ TransactionId that = (TransactionId) other;
+ return Objects.equal(this.id, that.id);
+ }
+ return false;
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionalMapUpdate.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionalMapUpdate.java
new file mode 100644
index 0000000..6e83b69
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionalMapUpdate.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+/**
+ * A batch updates to an {@code AsyncConsistentMap} be committed as a transaction.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class TransactionalMapUpdate<K, V> {
+ private final TransactionId transactionId;
+ private final Collection<MapUpdate<K, V>> updates;
+ private boolean indexPopulated = false;
+ private final Map<K, V> keyValueIndex = Maps.newHashMap();
+
+ public TransactionalMapUpdate(TransactionId transactionId, Collection<MapUpdate<K, V>> updates) {
+ this.transactionId = transactionId;
+ this.updates = ImmutableList.copyOf(updates);
+ populateIndex();
+ }
+
+ /**
+ * Returns the transaction identifier.
+ * @return transaction id
+ */
+ public TransactionId transactionId() {
+ return transactionId;
+ }
+
+ /**
+ * Returns the collection of map updates.
+ * @return map updates
+ */
+ public Collection<MapUpdate<K, V>> batch() {
+ return updates;
+ }
+
+ /**
+ * Returns the value that will be associated with the key after this transaction commits.
+ * @param key key
+ * @return value that will be associated with the value once this transaction commits
+ */
+ public V valueForKey(K key) {
+ if (!indexPopulated) {
+ // We do not synchronize as we don't expect this called to be made from multiple threads.
+ populateIndex();
+ }
+ return keyValueIndex.get(key);
+ }
+
+ /**
+ * Populates the internal key -> value mapping.
+ */
+ private synchronized void populateIndex() {
+ updates.forEach(mapUpdate -> {
+ if (mapUpdate.value() != null) {
+ keyValueIndex.put(mapUpdate.key(), mapUpdate.value());
+ }
+ });
+ indexPopulated = true;
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/package-info.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/package-info.java
new file mode 100644
index 0000000..9cf8c15
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * State machine implementation classes for distributed primitives.
+ */
+package org.onosproject.store.primitives.resources.impl;
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
new file mode 100644
index 0000000..445dda4
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -0,0 +1,447 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.resource.ResourceType;
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests for {@link AtomixConsistentMap}.
+ */
+public class AtomixConsistentMapTest extends AtomixTestBase {
+
+ @Override
+ protected ResourceType resourceType() {
+ return new ResourceType(AtomixConsistentMap.class);
+ }
+
+ /**
+ * Tests various basic map operations.
+ */
+ @Test
+ public void testBasicMapOperations() throws Throwable {
+ basicMapOperationTests(1);
+ clearTests();
+ basicMapOperationTests(2);
+ clearTests();
+ basicMapOperationTests(3);
+ }
+
+ /**
+ * Tests various map compute* operations on different cluster sizes.
+ */
+ @Test
+ public void testMapComputeOperations() throws Throwable {
+ mapComputeOperationTests(1);
+ clearTests();
+ mapComputeOperationTests(2);
+ clearTests();
+ mapComputeOperationTests(3);
+ }
+
+ /**
+ * Tests map event notifications.
+ */
+ @Test
+ public void testMapListeners() throws Throwable {
+ mapListenerTests(1);
+ clearTests();
+ mapListenerTests(2);
+ clearTests();
+ mapListenerTests(3);
+ }
+
+ /**
+ * Tests map transaction commit.
+ */
+ @Test
+ public void testTransactionCommit() throws Throwable {
+ transactionCommitTests(1);
+ clearTests();
+ transactionCommitTests(2);
+ clearTests();
+ transactionCommitTests(3);
+ }
+
+ /**
+ * Tests map transaction rollback.
+ */
+ @Test
+ public void testTransactionRollback() throws Throwable {
+ transactionRollbackTests(1);
+ clearTests();
+ transactionRollbackTests(2);
+ clearTests();
+ transactionRollbackTests(3);
+ }
+
+ protected void basicMapOperationTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+
+ final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
+ final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+
+ map.isEmpty().thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.put("foo", rawFooValue).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 1);
+ }).join();
+
+ map.isEmpty().thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
+ assertNotNull(result);
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+ }).join();
+
+ map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 2);
+ }).join();
+
+ map.keySet().thenAccept(result -> {
+ assertTrue(result.size() == 2);
+ assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
+ }).join();
+
+ map.values().thenAccept(result -> {
+ assertTrue(result.size() == 2);
+ List<String> rawValues =
+ result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
+ assertTrue(rawValues.contains("Hello foo!"));
+ assertTrue(rawValues.contains("Hello bar!"));
+ }).join();
+
+ map.entrySet().thenAccept(result -> {
+ assertTrue(result.size() == 2);
+ // TODO: check entries
+ }).join();
+
+ map.get("foo").thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+ }).join();
+
+ map.remove("foo").thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+ }).join();
+
+ map.containsKey("foo").thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.get("foo").thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.get("bar").thenAccept(result -> {
+ assertNotNull(result);
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
+ }).join();
+
+ map.containsKey("bar").thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 1);
+ }).join();
+
+ map.containsValue(rawBarValue).thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.containsValue(rawFooValue).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
+ assertNotNull(result);
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
+ }).join();
+
+ map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ // try replace_if_value_match for a non-existent key
+ map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ Versioned<byte[]> barValue = map.get("bar").join();
+ map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ map.clear().join();
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 0);
+ }).join();
+ }
+
+ public void mapComputeOperationTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ final byte[] value1 = Tools.getBytesUtf8("value1");
+ final byte[] value2 = Tools.getBytesUtf8("value2");
+ final byte[] value3 = Tools.getBytesUtf8("value3");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+
+ map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+ }).join();
+
+ map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+ }).join();
+
+ map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
+ assertNull(result);
+ });
+
+ map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
+ }).join();
+
+ map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+ }).join();
+
+ map.compute("foo", (k, v) -> value2).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
+ }).join();
+ }
+
+
+ protected void mapListenerTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ final byte[] value1 = Tools.getBytesUtf8("value1");
+ final byte[] value2 = Tools.getBytesUtf8("value2");
+ final byte[] value3 = Tools.getBytesUtf8("value3");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+ TestMapEventListener listener = new TestMapEventListener();
+
+ // add listener; insert new value into map and verify an INSERT event is received.
+ map.addListener(listener).join();
+ map.put("foo", value1).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.INSERT, listener.event().type());
+ assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ // remove listener and verify listener is not notified.
+ map.removeListener(listener).join();
+ map.put("foo", value2).join();
+ assertNull(listener.event());
+
+ // add the listener back and verify UPDATE events are received correctly
+ map.addListener(listener).join();
+ map.put("foo", value3).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+ assertTrue(Arrays.equals(value3, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ // perform a non-state changing operation and verify no events are received.
+ map.putIfAbsent("foo", value1).join();
+ assertNull(listener.event());
+
+ // verify REMOVE events are received correctly.
+ map.remove("foo").join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.REMOVE, listener.event().type());
+ assertTrue(Arrays.equals(value3, listener.event().oldValue().value()));
+ listener.clearEvent();
+
+ // verify compute methods also generate events.
+ map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.INSERT, listener.event().type());
+ assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ map.compute("foo", (k, v) -> value2).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+ assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.REMOVE, listener.event().type());
+ assertTrue(Arrays.equals(value2, listener.event().oldValue().value()));
+ listener.clearEvent();
+
+ map.removeListener(listener).join();
+ }
+
+ protected void transactionCommitTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ final byte[] value1 = Tools.getBytesUtf8("value1");
+ final byte[] value2 = Tools.getBytesUtf8("value2");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+ TestMapEventListener listener = new TestMapEventListener();
+
+ map.addListener(listener).join();
+
+ MapUpdate<String, byte[]> update1 =
+ MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
+ .withKey("foo")
+ .withValue(value1)
+ .build();
+
+ TransactionalMapUpdate<String, byte[]> txMapUpdate =
+ new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
+
+ map.prepare(txMapUpdate).thenAccept(result -> {
+ assertEquals(PrepareResult.OK, result);
+ }).join();
+ assertNull(listener.event());
+
+ map.size().thenAccept(result -> {
+ assertTrue(result == 0);
+ }).join();
+
+ map.get("foo").thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ try {
+ map.put("foo", value2).join();
+ assertTrue(false);
+ } catch (CompletionException e) {
+ assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
+ }
+
+ assertNull(listener.event());
+
+ map.commit(txMapUpdate.transactionId()).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.INSERT, listener.event().type());
+ assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+ listener.clearEvent();
+
+ map.put("foo", value2).thenAccept(result -> {
+ assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+ }).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+ assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+ listener.clearEvent();
+ }
+
+ protected void transactionRollbackTests(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ final byte[] value1 = Tools.getBytesUtf8("value1");
+ final byte[] value2 = Tools.getBytesUtf8("value2");
+
+ AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+ TestMapEventListener listener = new TestMapEventListener();
+
+ map.addListener(listener).join();
+
+ MapUpdate<String, byte[]> update1 =
+ MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
+ .withKey("foo")
+ .withValue(value1)
+ .build();
+ TransactionalMapUpdate<String, byte[]> txMapUpdate =
+ new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
+ map.prepare(txMapUpdate).thenAccept(result -> {
+ assertEquals(PrepareResult.OK, result);
+ }).join();
+ assertNull(listener.event());
+
+ map.rollback(txMapUpdate.transactionId()).join();
+ assertNull(listener.event());
+
+ map.get("foo").thenAccept(result -> {
+ assertNull(result);
+ }).join();
+
+ map.put("foo", value2).thenAccept(result -> {
+ assertNull(result);
+ }).join();
+ assertNotNull(listener.event());
+ assertEquals(MapEvent.Type.INSERT, listener.event().type());
+ assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+ listener.clearEvent();
+ }
+
+ private static class TestMapEventListener implements MapEventListener<String, byte[]> {
+
+ MapEvent<String, byte[]> event;
+
+ @Override
+ public void event(MapEvent<String, byte[]> event) {
+ this.event = event;
+ }
+
+ public MapEvent<String, byte[]> event() {
+ return event;
+ }
+
+ public void clearEvent() {
+ event = null;
+ }
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
new file mode 100644
index 0000000..7d77d03
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -0,0 +1,339 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+
+import io.atomix.Atomix;
+import io.atomix.resource.ResourceType;
+
+/**
+ * Unit tests for {@link AtomixLeaderElector}.
+ */
+public class AtomixLeaderElectorTest extends AtomixTestBase {
+
+ NodeId node1 = new NodeId("node1");
+ NodeId node2 = new NodeId("node2");
+ NodeId node3 = new NodeId("node3");
+
+ @Override
+ protected ResourceType resourceType() {
+ return new ResourceType(AtomixLeaderElector.class);
+ }
+
+ @Test
+ public void testRun() throws Throwable {
+ leaderElectorRunTests(1);
+ clearTests();
+// leaderElectorRunTests(2);
+// clearTests();
+// leaderElectorRunTests(3);
+// clearTests();
+ }
+
+ private void leaderElectorRunTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).thenAccept(result -> {
+ assertEquals(node1, result.leaderNodeId());
+ assertEquals(1, result.leader().term());
+ assertEquals(1, result.candidates().size());
+ assertEquals(node1, result.candidates().get(0));
+ }).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ elector2.run("foo", node2).thenAccept(result -> {
+ assertEquals(node1, result.leaderNodeId());
+ assertEquals(1, result.leader().term());
+ assertEquals(2, result.candidates().size());
+ assertEquals(node1, result.candidates().get(0));
+ assertEquals(node2, result.candidates().get(1));
+ }).join();
+ }
+
+ @Test
+ public void testWithdraw() throws Throwable {
+ leaderElectorWithdrawTests(1);
+ clearTests();
+ leaderElectorWithdrawTests(2);
+ clearTests();
+ leaderElectorWithdrawTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorWithdrawTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ elector2.run("foo", node2).join();
+
+ LeaderEventListener listener1 = new LeaderEventListener();
+ elector1.addChangeListener(listener1).join();
+
+ LeaderEventListener listener2 = new LeaderEventListener();
+ elector2.addChangeListener(listener2).join();
+
+ elector1.withdraw("foo").join();
+
+ listener1.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().leader().term());
+ assertEquals(1, result.newValue().candidates().size());
+ assertEquals(node2, result.newValue().candidates().get(0));
+ }).join();
+
+ listener2.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().leader().term());
+ assertEquals(1, result.newValue().candidates().size());
+ assertEquals(node2, result.newValue().candidates().get(0));
+ }).join();
+ }
+
+ @Test
+ public void testAnoint() throws Throwable {
+ leaderElectorAnointTests(1);
+ clearTests();
+ leaderElectorAnointTests(2);
+ clearTests();
+ leaderElectorAnointTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorAnointTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ Atomix client3 = createAtomixClient();
+ AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ elector2.run("foo", node2).join();
+
+ LeaderEventListener listener1 = new LeaderEventListener();
+ elector1.addChangeListener(listener1).join();
+ LeaderEventListener listener2 = new LeaderEventListener();
+ elector2.addChangeListener(listener2);
+ LeaderEventListener listener3 = new LeaderEventListener();
+ elector3.addChangeListener(listener3).join();
+
+ elector3.anoint("foo", node3).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+ assertFalse(listener1.hasEvent());
+ assertFalse(listener2.hasEvent());
+ assertFalse(listener3.hasEvent());
+
+ elector3.anoint("foo", node2).thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+ assertTrue(listener1.hasEvent());
+ assertTrue(listener2.hasEvent());
+ assertTrue(listener3.hasEvent());
+
+ listener1.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().candidates().size());
+ assertEquals(node1, result.newValue().candidates().get(0));
+ assertEquals(node2, result.newValue().candidates().get(1));
+ }).join();
+ listener2.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().candidates().size());
+ assertEquals(node1, result.newValue().candidates().get(0));
+ assertEquals(node2, result.newValue().candidates().get(1));
+ }).join();
+ listener3.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(2, result.newValue().candidates().size());
+ assertEquals(node1, result.newValue().candidates().get(0));
+ assertEquals(node2, result.newValue().candidates().get(1));
+ }).join();
+ }
+
+ @Test
+ public void testLeaderSessionClose() throws Throwable {
+ leaderElectorLeaderSessionCloseTests(1);
+ clearTests();
+ leaderElectorLeaderSessionCloseTests(2);
+ clearTests();
+ leaderElectorLeaderSessionCloseTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ LeaderEventListener listener = new LeaderEventListener();
+ elector2.run("foo", node2).join();
+ elector2.addChangeListener(listener).join();
+ client1.close();
+ listener.nextEvent().thenAccept(result -> {
+ assertEquals(node2, result.newValue().leaderNodeId());
+ assertEquals(1, result.newValue().candidates().size());
+ assertEquals(node2, result.newValue().candidates().get(0));
+ }).join();
+ }
+
+ @Test
+ public void testNonLeaderSessionClose() throws Throwable {
+ leaderElectorNonLeaderSessionCloseTests(1);
+ clearTests();
+ leaderElectorNonLeaderSessionCloseTests(2);
+ clearTests();
+ leaderElectorNonLeaderSessionCloseTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ LeaderEventListener listener = new LeaderEventListener();
+ elector2.run("foo", node2).join();
+ elector1.addChangeListener(listener).join();
+ client2.close().join();
+ listener.nextEvent().thenAccept(result -> {
+ assertEquals(node1, result.newValue().leaderNodeId());
+ assertEquals(1, result.newValue().candidates().size());
+ assertEquals(node1, result.newValue().candidates().get(0));
+ }).join();
+ }
+
+ @Test
+ public void testQueries() throws Throwable {
+ leaderElectorQueryTests(1);
+ clearTests();
+ leaderElectorQueryTests(2);
+ clearTests();
+ leaderElectorQueryTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorQueryTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ elector2.run("foo", node2).join();
+ elector2.run("bar", node2).join();
+ elector1.getElectedTopics(node1).thenAccept(result -> {
+ assertEquals(1, result.size());
+ assertTrue(result.contains("foo"));
+ }).join();
+ elector2.getElectedTopics(node1).thenAccept(result -> {
+ assertEquals(1, result.size());
+ assertTrue(result.contains("foo"));
+ }).join();
+ elector1.getLeadership("foo").thenAccept(result -> {
+ assertEquals(node1, result.leaderNodeId());
+ assertEquals(node1, result.candidates().get(0));
+ assertEquals(node2, result.candidates().get(1));
+ }).join();
+ elector2.getLeadership("foo").thenAccept(result -> {
+ assertEquals(node1, result.leaderNodeId());
+ assertEquals(node1, result.candidates().get(0));
+ assertEquals(node2, result.candidates().get(1));
+ }).join();
+ elector1.getLeadership("bar").thenAccept(result -> {
+ assertEquals(node2, result.leaderNodeId());
+ assertEquals(node2, result.candidates().get(0));
+ }).join();
+ elector2.getLeadership("bar").thenAccept(result -> {
+ assertEquals(node2, result.leaderNodeId());
+ assertEquals(node2, result.candidates().get(0));
+ }).join();
+ elector1.getLeaderships().thenAccept(result -> {
+ assertEquals(2, result.size());
+ Leadership fooLeadership = result.get("foo");
+ assertEquals(node1, fooLeadership.leaderNodeId());
+ assertEquals(node1, fooLeadership.candidates().get(0));
+ assertEquals(node2, fooLeadership.candidates().get(1));
+ Leadership barLeadership = result.get("bar");
+ assertEquals(node2, barLeadership.leaderNodeId());
+ assertEquals(node2, barLeadership.candidates().get(0));
+ }).join();
+ elector2.getLeaderships().thenAccept(result -> {
+ assertEquals(2, result.size());
+ Leadership fooLeadership = result.get("foo");
+ assertEquals(node1, fooLeadership.leaderNodeId());
+ assertEquals(node1, fooLeadership.candidates().get(0));
+ assertEquals(node2, fooLeadership.candidates().get(1));
+ Leadership barLeadership = result.get("bar");
+ assertEquals(node2, barLeadership.leaderNodeId());
+ assertEquals(node2, barLeadership.candidates().get(0));
+ }).join();
+ }
+
+ private static class LeaderEventListener implements Consumer<Change<Leadership>> {
+ Queue<Change<Leadership>> eventQueue = new LinkedList<>();
+ CompletableFuture<Change<Leadership>> pendingFuture;
+
+ @Override
+ public void accept(Change<Leadership> change) {
+ synchronized (this) {
+ if (pendingFuture != null) {
+ pendingFuture.complete(change);
+ pendingFuture = null;
+ } else {
+ eventQueue.add(change);
+ }
+ }
+ }
+
+ public boolean hasEvent() {
+ return !eventQueue.isEmpty();
+ }
+
+ public CompletableFuture<Change<Leadership>> nextEvent() {
+ synchronized (this) {
+ if (eventQueue.isEmpty()) {
+ if (pendingFuture == null) {
+ pendingFuture = new CompletableFuture<>();
+ }
+ return pendingFuture;
+ } else {
+ return CompletableFuture.completedFuture(eventQueue.poll());
+ }
+ }
+ }
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
new file mode 100644
index 0000000..d38400d
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import io.atomix.Atomix;
+import io.atomix.resource.ResourceType;
+import io.atomix.variables.DistributedLong;
+
+/**
+ * Unit tests for {@link AtomixCounter}.
+ */
+public class AtomixLongTest extends AtomixTestBase {
+
+ @Override
+ protected ResourceType resourceType() {
+ return new ResourceType(DistributedLong.class);
+ }
+
+ @Test
+ public void testBasicOperations() throws Throwable {
+ basicOperationsTest(1);
+ clearTests();
+ basicOperationsTest(2);
+ clearTests();
+ basicOperationsTest(3);
+ clearTests();
+ }
+
+ protected void basicOperationsTest(int clusterSize) throws Throwable {
+ createCopycatServers(clusterSize);
+ Atomix atomix = createAtomixClient();
+ AtomixCounter along = new AtomixCounter("test-long", atomix.getLong("test-long").join());
+ assertEquals(0, along.get().join().longValue());
+ assertEquals(1, along.incrementAndGet().join().longValue());
+ along.set(100).join();
+ assertEquals(100, along.get().join().longValue());
+ assertEquals(100, along.getAndAdd(10).join().longValue());
+ assertEquals(110, along.get().join().longValue());
+ assertFalse(along.compareAndSet(109, 111).join());
+ assertTrue(along.compareAndSet(110, 111).join());
+ assertEquals(100, along.addAndGet(-11).join().longValue());
+ assertEquals(100, along.getAndIncrement().join().longValue());
+ assertEquals(101, along.get().join().longValue());
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
new file mode 100644
index 0000000..d655d52
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.LocalServerRegistry;
+import io.atomix.catalyst.transport.LocalTransport;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.server.CopycatServer;
+import io.atomix.copycat.server.storage.Storage;
+import io.atomix.copycat.server.storage.StorageLevel;
+import io.atomix.manager.state.ResourceManagerState;
+import io.atomix.resource.ResourceRegistry;
+import io.atomix.resource.ResourceType;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.onosproject.store.primitives.impl.CatalystSerializers;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Base class for various Atomix* tests.
+ */
+public abstract class AtomixTestBase {
+ private static final File TEST_DIR = new File("target/test-logs");
+ protected LocalServerRegistry registry;
+ protected int port;
+ protected List<Address> members;
+ protected List<CopycatClient> copycatClients = new ArrayList<>();
+ protected List<CopycatServer> copycatServers = new ArrayList<>();
+ protected List<Atomix> atomixClients = new ArrayList<>();
+ protected List<CopycatServer> atomixServers = new ArrayList<>();
+ protected Serializer serializer = CatalystSerializers.getSerializer();
+
+ /**
+ * Creates a new resource state machine.
+ *
+ * @return A new resource state machine.
+ */
+ protected abstract ResourceType resourceType();
+
+ /**
+ * Returns the next server address.
+ *
+ * @return The next server address.
+ */
+ private Address nextAddress() {
+ Address address = new Address("localhost", port++);
+ members.add(address);
+ return address;
+ }
+
+ /**
+ * Creates a set of Copycat servers.
+ */
+ protected List<CopycatServer> createCopycatServers(int nodes) throws Throwable {
+ CountDownLatch latch = new CountDownLatch(nodes);
+ List<CopycatServer> servers = new ArrayList<>();
+
+ List<Address> members = new ArrayList<>();
+ for (int i = 0; i < nodes; i++) {
+ members.add(nextAddress());
+ }
+
+ for (int i = 0; i < nodes; i++) {
+ CopycatServer server = createCopycatServer(members.get(i));
+ server.open().thenRun(latch::countDown);
+ servers.add(server);
+ }
+
+ Uninterruptibles.awaitUninterruptibly(latch);
+
+ return servers;
+ }
+
+ /**
+ * Creates a Copycat server.
+ */
+ protected CopycatServer createCopycatServer(Address address) {
+ ResourceRegistry resourceRegistry = new ResourceRegistry();
+ resourceRegistry.register(resourceType());
+ CopycatServer server = CopycatServer.builder(address, members)
+ .withTransport(new LocalTransport(registry))
+ .withStorage(Storage.builder()
+ .withStorageLevel(StorageLevel.DISK)
+ .withDirectory(TEST_DIR + "/" + address.port())
+ .withSerializer(serializer.clone())
+ .build())
+ .withStateMachine(() -> new ResourceManagerState(resourceRegistry))
+ .withSerializer(serializer.clone())
+ .withHeartbeatInterval(Duration.ofMillis(25))
+ .withElectionTimeout(Duration.ofMillis(50))
+ .withSessionTimeout(Duration.ofMillis(100))
+ .build();
+ copycatServers.add(server);
+ return server;
+ }
+
+ @Before
+ @After
+ public void clearTests() throws Exception {
+ registry = new LocalServerRegistry();
+ members = new ArrayList<>();
+ port = 5000;
+
+ CompletableFuture<Void> closeClients =
+ CompletableFuture.allOf(atomixClients.stream()
+ .map(Atomix::close)
+ .toArray(CompletableFuture[]::new));
+
+ closeClients.thenCompose(v -> CompletableFuture.allOf(copycatServers.stream()
+ .map(CopycatServer::close)
+ .toArray(CompletableFuture[]::new))).join();
+
+ deleteDirectory(TEST_DIR);
+
+ atomixClients = new ArrayList<>();
+
+ copycatServers = new ArrayList<>();
+ }
+
+ /**
+ * Deletes a directory recursively.
+ */
+ private void deleteDirectory(File directory) throws IOException {
+ if (directory.exists()) {
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ deleteDirectory(file);
+ } else {
+ Files.delete(file.toPath());
+ }
+ }
+ }
+ Files.delete(directory.toPath());
+ }
+ }
+
+ /**
+ * Creates a Atomix client.
+ */
+ protected Atomix createAtomixClient() {
+ CountDownLatch latch = new CountDownLatch(1);
+ Atomix client = AtomixClient.builder(members)
+ .withTransport(new LocalTransport(registry))
+ .withSerializer(serializer.clone())
+ .withResourceResolver(r -> r.register(resourceType()))
+ .build();
+ client.open().thenRun(latch::countDown);
+ atomixClients.add(client);
+ Uninterruptibles.awaitUninterruptibly(latch);
+ return client;
+ }
+}