State machine implementations for various distributed primitives based on latest Copycat APIs

Change-Id: I622cc196aa1cdf072a5a0b100a5ffaaf71b07900
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
new file mode 100644
index 0000000..f8e111b
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.TypeSerializerFactory;
+import io.atomix.copycat.client.Query;
+import io.atomix.manager.state.GetResource;
+import io.atomix.manager.state.GetResourceKeys;
+import io.atomix.resource.ResourceQuery;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.Scanner;
+
+import org.onlab.util.Match;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
+import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
+import org.onosproject.store.primitives.resources.impl.MapUpdate;
+import org.onosproject.store.primitives.resources.impl.PrepareResult;
+import org.onosproject.store.primitives.resources.impl.RollbackResult;
+import org.onosproject.store.primitives.resources.impl.TransactionId;
+import org.onosproject.store.primitives.resources.impl.TransactionalMapUpdate;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+
+/**
+ * Serializer utility for Atomix Catalyst.
+ */
+public final class CatalystSerializers {
+
+    private CatalystSerializers() {
+    }
+
+    public static Serializer getSerializer() {
+        Serializer serializer = new Serializer();
+        TypeSerializerFactory factory =
+                new DefaultCatalystTypeSerializerFactory(
+                        org.onosproject.store.service.Serializer.using(Arrays.asList((KryoNamespaces.API)),
+                                MapEntryUpdateResult.class,
+                                MapEntryUpdateResult.Status.class,
+                                MapUpdate.class,
+                                MapUpdate.Type.class,
+                                TransactionalMapUpdate.class,
+                                TransactionId.class,
+                                PrepareResult.class,
+                                CommitResult.class,
+                                RollbackResult.class,
+                                AtomixConsistentMapCommands.Get.class,
+                                AtomixConsistentMapCommands.ContainsKey.class,
+                                AtomixConsistentMapCommands.ContainsValue.class,
+                                AtomixConsistentMapCommands.Size.class,
+                                AtomixConsistentMapCommands.IsEmpty.class,
+                                AtomixConsistentMapCommands.KeySet.class,
+                                AtomixConsistentMapCommands.EntrySet.class,
+                                AtomixConsistentMapCommands.Values.class,
+                                AtomixConsistentMapCommands.UpdateAndGet.class,
+                                AtomixConsistentMapCommands.TransactionPrepare.class,
+                                AtomixConsistentMapCommands.TransactionCommit.class,
+                                AtomixConsistentMapCommands.TransactionRollback.class,
+                                AtomixLeaderElectorCommands.GetLeadership.class,
+                                AtomixLeaderElectorCommands.GetAllLeaderships.class,
+                                AtomixLeaderElectorCommands.GetElectedTopics.class,
+                                AtomixLeaderElectorCommands.Run.class,
+                                AtomixLeaderElectorCommands.Withdraw.class,
+                                AtomixLeaderElectorCommands.Anoint.class,
+                                GetResource.class,
+                                GetResourceKeys.class,
+                                ResourceQuery.class,
+                                Query.ConsistencyLevel.class));
+        // ONOS classes
+        serializer.register(Change.class, factory);
+        serializer.register(NodeId.class, factory);
+        serializer.register(Match.class, factory);
+        serializer.register(MapEntryUpdateResult.class, factory);
+        serializer.register(MapEntryUpdateResult.Status.class, factory);
+        serializer.register(TransactionalMapUpdate.class, factory);
+        serializer.register(PrepareResult.class, factory);
+        serializer.register(CommitResult.class, factory);
+        serializer.register(RollbackResult.class, factory);
+        serializer.register(TransactionId.class, factory);
+        serializer.register(MapUpdate.class, factory);
+        serializer.register(Versioned.class, factory);
+        serializer.register(MapEvent.class, factory);
+        serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
+        serializer.register(AtomixConsistentMapState.class, factory);
+
+        serializer.register(ResourceQuery.class, factory);
+        serializer.register(GetResource.class, factory);
+        serializer.register(GetResourceKeys.class, factory);
+
+        // ConsistentMap
+        serializer.register(AtomixConsistentMapCommands.UpdateAndGet.class, factory);
+        serializer.register(AtomixConsistentMapCommands.Clear.class);
+        serializer.register(AtomixConsistentMapCommands.Listen.class);
+        serializer.register(AtomixConsistentMapCommands.Unlisten.class);
+        serializer.register(AtomixConsistentMapCommands.Get.class);
+        serializer.register(AtomixConsistentMapCommands.ContainsKey.class);
+        serializer.register(AtomixConsistentMapCommands.ContainsValue.class);
+        serializer.register(AtomixConsistentMapCommands.EntrySet.class);
+        serializer.register(AtomixConsistentMapCommands.IsEmpty.class);
+        serializer.register(AtomixConsistentMapCommands.KeySet.class);
+        serializer.register(AtomixConsistentMapCommands.Size.class);
+        serializer.register(AtomixConsistentMapCommands.Values.class);
+        serializer.register(AtomixConsistentMapCommands.TransactionPrepare.class);
+        serializer.register(AtomixConsistentMapCommands.TransactionCommit.class);
+        serializer.register(AtomixConsistentMapCommands.TransactionRollback.class);
+        // LeaderElector
+        serializer.register(AtomixLeaderElectorCommands.Run.class, factory);
+        serializer.register(AtomixLeaderElectorCommands.Withdraw.class, factory);
+        serializer.register(AtomixLeaderElectorCommands.Anoint.class, factory);
+        serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
+        serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
+        serializer.register(AtomixLeaderElectorCommands.GetLeadership.class, factory);
+        serializer.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, factory);
+        serializer.register(AtomixLeaderElectorCommands.Listen.class);
+        serializer.register(AtomixLeaderElectorCommands.Unlisten.class);
+        // Atomix types
+        try {
+            ClassLoader cl = CatalystSerializable.class.getClassLoader();
+            Enumeration<URL> urls = cl.getResources(
+                    String.format("META-INF/services/%s", CatalystSerializable.class.getName()));
+            while (urls.hasMoreElements()) {
+                URL url = urls.nextElement();
+                try (Scanner scanner = new Scanner(url.openStream(), "UTF-8")) {
+                    scanner.useDelimiter("\n").forEachRemaining(line -> {
+                        if (!line.trim().startsWith("#")) {
+                            line = line.trim();
+                            if (line.length() > 0) {
+                                try {
+                                    serializer.register(cl.loadClass(line));
+                                } catch (ClassNotFoundException e) {
+                                    Throwables.propagate(e);
+                                }
+                            }
+                        }
+                    });
+                }
+            }
+        } catch (IOException e) {
+            Throwables.propagate(e);
+        }
+        return serializer;
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
new file mode 100644
index 0000000..46487f2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.util.Listener;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.Consistency;
+import io.atomix.resource.Resource;
+import io.atomix.resource.ResourceTypeInfo;
+
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import org.onlab.util.Match;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed resource providing the {@link AsyncConsistentMap} primitive.
+ */
+@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
+public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
+    implements AsyncConsistentMap<String, byte[]> {
+
+    private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
+
+    private static final String CHANGE_SUBJECT = "change";
+
+    public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
+        super(client, options);
+    }
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<AtomixConsistentMap> open() {
+        return super.open().thenApply(result -> {
+            client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
+            return result;
+        });
+    }
+
+    private void handleEvent(MapEvent<String, byte[]> event) {
+        mapEventListeners.forEach(listener -> listener.event(event));
+    }
+
+    @Override
+    public AtomixConsistentMap with(Consistency consistency) {
+        super.with(consistency);
+        return this;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty() {
+        return submit(new AtomixConsistentMapCommands.IsEmpty());
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        return submit(new AtomixConsistentMapCommands.Size());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsKey(String key) {
+        return submit(new AtomixConsistentMapCommands.ContainsKey(key));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsValue(byte[] value) {
+        return submit(new AtomixConsistentMapCommands.ContainsValue(value));
+    }
+
+    @Override
+    public CompletableFuture<Versioned<byte[]>> get(String key) {
+        return submit(new AtomixConsistentMapCommands.Get(key));
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> keySet() {
+        return submit(new AtomixConsistentMapCommands.KeySet());
+    }
+
+    @Override
+    public CompletableFuture<Collection<Versioned<byte[]>>> values() {
+        return submit(new AtomixConsistentMapCommands.Values());
+    }
+
+    @Override
+    public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
+        return submit(new AtomixConsistentMapCommands.EntrySet());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
+        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
+                .whenComplete((r, e) -> throwIfLocked(r.status()))
+                .thenApply(v -> v.oldValue());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
+        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
+                .whenComplete((r, e) -> throwIfLocked(r.status()))
+                .thenApply(v -> v.newValue());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
+        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
+                .whenComplete((r, e) -> throwIfLocked(r.status()))
+                .thenApply(v -> v.oldValue());
+    }
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Versioned<byte[]>> remove(String key) {
+        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
+                .whenComplete((r, e) -> throwIfLocked(r.status()))
+                .thenApply(v -> v.oldValue());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Boolean> remove(String key, byte[] value) {
+        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
+                .whenComplete((r, e) -> throwIfLocked(r.status()))
+                .thenApply(v -> v.updated());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Boolean> remove(String key, long version) {
+        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
+                .whenComplete((r, e) -> throwIfLocked(r.status()))
+                .thenApply(v -> v.updated());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
+        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
+                .whenComplete((r, e) -> throwIfLocked(r.status()))
+                .thenApply(v -> v.oldValue());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
+        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+                newValue,
+                Match.ifValue(oldValue),
+                Match.ANY))
+                .whenComplete((r, e) -> throwIfLocked(r.status()))
+                .thenApply(v -> v.updated());
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
+        return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+                newValue,
+                Match.ANY,
+                Match.ifValue(oldVersion)))
+                .whenComplete((r, e) -> throwIfLocked(r.status()))
+                .thenApply(v -> v.updated());
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        return submit(new AtomixConsistentMapCommands.Clear())
+                .whenComplete((r, e) -> throwIfLocked(r))
+                .thenApply(v -> null);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public CompletableFuture<Versioned<byte[]>> computeIf(String key,
+            Predicate<? super byte[]> condition,
+                    BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
+        return get(key).thenCompose(r1 -> {
+            byte[] existingValue = r1 == null ? null : r1.value();
+            // if the condition evaluates to false, return existing value.
+            if (!condition.test(existingValue)) {
+                return CompletableFuture.completedFuture(r1);
+            }
+
+            AtomicReference<byte[]> computedValue = new AtomicReference<>();
+            // if remappingFunction throws an exception, return the exception.
+            try {
+                computedValue.set(remappingFunction.apply(key, existingValue));
+            } catch (Exception e) {
+                CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
+                future.completeExceptionally(e);
+                return future;
+            }
+            if (computedValue.get() == null && r1 == null) {
+                return CompletableFuture.completedFuture(null);
+            }
+            Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
+            Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
+            return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
+                    computedValue.get(),
+                    valueMatch,
+                    versionMatch))
+                    .whenComplete((r, e) -> throwIfLocked(r.status()))
+                    .thenApply(v -> v.newValue());
+        });
+    }
+
+    public CompletableFuture<PrepareResult> prepare(TransactionalMapUpdate<String, byte[]> update) {
+        return submit(new AtomixConsistentMapCommands.TransactionPrepare(update));
+    }
+
+    public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
+        return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
+    }
+
+    public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
+        return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
+    }
+
+    @Override
+    public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
+        if (!mapEventListeners.isEmpty()) {
+            if (mapEventListeners.add(listener)) {
+                return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+        mapEventListeners.add(listener);
+        return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
+    }
+
+    @Override
+    public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
+        if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
+            return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void throwIfLocked(MapEntryUpdateResult.Status status) {
+        if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
+            throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
+        }
+    }
+
+    /**
+     * Change listener context.
+     */
+    private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
+        private final MapEventListener<String, byte[]> listener;
+
+        private ChangeListener(MapEventListener<String, byte[]> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void accept(MapEvent<String, byte[]> event) {
+            listener.event(event);
+        }
+
+        @Override
+        public void close() {
+            synchronized (AtomixConsistentMap.this) {
+                mapEventListeners.remove(listener);
+                if (mapEventListeners.isEmpty()) {
+                    submit(new AtomixConsistentMapCommands.Unlisten());
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
new file mode 100644
index 0000000..c463320
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -0,0 +1,515 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.copycat.client.Command;
+import io.atomix.copycat.client.Query;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.onlab.util.Match;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * {@link AtomixConsistentMap} resource state machine operations.
+ */
+public final class AtomixConsistentMapCommands {
+
+    private AtomixConsistentMapCommands() {
+    }
+
+    /**
+     * Abstract map command.
+     */
+    @SuppressWarnings("serial")
+    public abstract static class MapCommand<V> implements Command<V>, CatalystSerializable {
+
+        @Override
+        public ConsistencyLevel consistency() {
+          return ConsistencyLevel.LINEARIZABLE;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .toString();
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+    }
+
+    /**
+     * Abstract map query.
+     */
+    @SuppressWarnings("serial")
+    public abstract static class MapQuery<V> implements Query<V>, CatalystSerializable {
+
+        @Override
+        public ConsistencyLevel consistency() {
+          return ConsistencyLevel.BOUNDED_LINEARIZABLE;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .toString();
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+    }
+
+    /**
+     * Abstract key-based query.
+     */
+    @SuppressWarnings("serial")
+    public abstract static class KeyQuery<V> extends MapQuery<V> {
+        protected String key;
+
+        public KeyQuery() {
+        }
+
+        public KeyQuery(String key) {
+            this.key = Assert.notNull(key, "key");
+        }
+
+        /**
+         * Returns the key.
+         * @return key
+         */
+        public String key() {
+            return key;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("key", key)
+                    .toString();
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(key, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            key = serializer.readObject(buffer);
+        }
+    }
+
+    /**
+     * Abstract key-based query.
+     */
+    @SuppressWarnings("serial")
+    public abstract static class ValueQuery<V> extends MapQuery<V> {
+        protected byte[] value;
+
+        public ValueQuery() {
+        }
+
+        public ValueQuery(byte[] value) {
+            this.value = Assert.notNull(value, "value");
+        }
+
+        /**
+         * Returns the key.
+         * @return key
+         */
+        public byte[] value() {
+            return value;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(value, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            value = serializer.readObject(buffer);
+        }
+    }
+
+    /**
+     * Contains key command.
+     */
+    @SuppressWarnings("serial")
+    public static class ContainsKey extends KeyQuery<Boolean> {
+        public ContainsKey() {
+        }
+
+        public ContainsKey(String key) {
+            super(key);
+        }
+    }
+
+    /**
+     * Contains key command.
+     */
+    @SuppressWarnings("serial")
+    public static class ContainsValue extends ValueQuery<Boolean> {
+        public ContainsValue() {
+        }
+
+        public ContainsValue(byte[] value) {
+            super(value);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("value", value)
+                    .toString();
+        }
+    }
+
+    /**
+     * Map prepare command.
+     */
+    @SuppressWarnings("serial")
+    public static class TransactionPrepare extends MapCommand<PrepareResult> {
+        private TransactionalMapUpdate<String, byte[]> update;
+
+        public TransactionPrepare() {
+        }
+
+        public TransactionPrepare(TransactionalMapUpdate<String, byte[]> update) {
+            this.update = update;
+        }
+
+        public TransactionalMapUpdate<String, byte[]> transactionUpdate() {
+            return update;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(update, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            update = serializer.readObject(buffer);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("update", update)
+                    .toString();
+        }
+    }
+
+    /**
+     * Map transaction commit command.
+     */
+    @SuppressWarnings("serial")
+    public static class TransactionCommit extends MapCommand<CommitResult> {
+        private TransactionId transactionId;
+
+        public TransactionCommit() {
+        }
+
+        public TransactionCommit(TransactionId transactionId) {
+            this.transactionId = transactionId;
+        }
+
+        /**
+         * Returns the transaction identifier.
+         * @return transaction id
+         */
+        public TransactionId transactionId() {
+            return transactionId;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(transactionId, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            transactionId = serializer.readObject(buffer);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("transactionId", transactionId)
+                    .toString();
+        }
+    }
+
+    /**
+     * Map transaction rollback command.
+     */
+    @SuppressWarnings("serial")
+    public static class TransactionRollback extends MapCommand<RollbackResult> {
+        private TransactionId transactionId;
+
+        public TransactionRollback() {
+        }
+
+        public TransactionRollback(TransactionId transactionId) {
+            this.transactionId = transactionId;
+        }
+
+        /**
+         * Returns the transaction identifier.
+         * @return transaction id
+         */
+        public TransactionId transactionId() {
+            return transactionId;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(transactionId, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            transactionId = serializer.readObject(buffer);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("transactionId", transactionId)
+                    .toString();
+        }
+    }
+
+    /**
+     * Map update command.
+     */
+    @SuppressWarnings("serial")
+    public static class UpdateAndGet extends MapCommand<MapEntryUpdateResult<String, byte[]>> {
+        private String key;
+        private byte[] value;
+        private Match<byte[]> valueMatch;
+        private Match<Long> versionMatch;
+
+        public UpdateAndGet() {
+        }
+
+        public UpdateAndGet(String key,
+                        byte[] value,
+                        Match<byte[]> valueMatch,
+                        Match<Long> versionMatch) {
+            this.key = key;
+            this.value = value;
+            this.valueMatch = valueMatch;
+            this.versionMatch = versionMatch;
+        }
+
+        /**
+         * Returns the key.
+         * @return key
+         */
+        public String key() {
+            return this.key;
+        }
+
+        /**
+         * Returns the value.
+         * @return value
+         */
+        public byte[] value() {
+            return this.value;
+        }
+
+        /**
+         * Returns the value match.
+         * @return value match
+         */
+        public Match<byte[]> valueMatch() {
+            return this.valueMatch;
+        }
+
+        /**
+         * Returns the version match.
+         * @return version match
+         */
+        public Match<Long> versionMatch() {
+            return this.versionMatch;
+        }
+
+        @Override
+        public CompactionMode compaction() {
+          return value == null ? CompactionMode.FULL : CompactionMode.QUORUM;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            super.writeObject(buffer, serializer);
+            serializer.writeObject(key, buffer);
+            serializer.writeObject(value, buffer);
+            serializer.writeObject(valueMatch, buffer);
+            serializer.writeObject(versionMatch, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+            super.readObject(buffer, serializer);
+            key = serializer.readObject(buffer);
+            value = serializer.readObject(buffer);
+            valueMatch = serializer.readObject(buffer);
+            versionMatch = serializer.readObject(buffer);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("key", key)
+                    .add("value", value)
+                    .add("valueMatch", valueMatch)
+                    .add("versionMatch", versionMatch)
+                    .toString();
+        }
+    }
+
+    /**
+     * Get query.
+     */
+    @SuppressWarnings("serial")
+    public static class Get extends KeyQuery<Versioned<byte[]>> {
+        public Get() {
+        }
+
+        public Get(String key) {
+            super(key);
+        }
+    }
+
+    /**
+     * Is empty query.
+     */
+    @SuppressWarnings("serial")
+    public static class IsEmpty extends MapQuery<Boolean> {
+    }
+
+    /**
+     * KeySet query.
+     */
+    @SuppressWarnings("serial")
+    public static class KeySet extends MapQuery<Set<String>> {
+    }
+
+    /**
+     * KeySet query.
+     */
+    @SuppressWarnings("serial")
+    public static class Values extends MapQuery<Collection<Versioned<byte[]>>> {
+    }
+
+    /**
+     * KeySet query.
+     */
+    @SuppressWarnings("serial")
+    public static class EntrySet extends MapQuery<Set<Map.Entry<String, Versioned<byte[]>>>> {
+    }
+
+    /**
+     * Size query.
+     */
+    @SuppressWarnings("serial")
+    public static class Size extends MapQuery<Integer> {
+    }
+
+    /**
+     * Clear command.
+     */
+    @SuppressWarnings("serial")
+    public static class Clear extends MapCommand<MapEntryUpdateResult.Status> {
+
+        @Override
+        public CompactionMode compaction() {
+          return CompactionMode.FULL;
+        }
+    }
+
+    /**
+     * Change listen.
+     */
+    @SuppressWarnings("serial")
+    public static class Listen implements Command<Void>, CatalystSerializable {
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .toString();
+        }
+    }
+
+    /**
+     * Change unlisten.
+     */
+    @SuppressWarnings("serial")
+    public static class Unlisten implements Command<Void>, CatalystSerializable {
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .toString();
+        }
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
new file mode 100644
index 0000000..100941f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -0,0 +1,626 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.onosproject.store.service.MapEvent.Type.INSERT;
+import static org.onosproject.store.service.MapEvent.Type.REMOVE;
+import static org.onosproject.store.service.MapEvent.Type.UPDATE;
+import io.atomix.copycat.client.session.Session;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.onlab.util.CountDownCompleter;
+import org.onlab.util.Match;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * State Machine for {@link AtomixConsistentMap} resource.
+ */
+public class AtomixConsistentMapState extends ResourceStateMachine implements
+        SessionListener, Snapshottable {
+    private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
+    private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
+    private final Set<String> preparedKeys = Sets.newHashSet();
+    private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
+            .newHashMap();
+    private AtomicLong versionCounter = new AtomicLong(0);
+
+    @Override
+    public void snapshot(SnapshotWriter writer) {
+        writer.writeLong(versionCounter.get());
+    }
+
+    @Override
+    public void install(SnapshotReader reader) {
+        versionCounter = new AtomicLong(reader.readLong());
+    }
+
+    @Override
+    protected void configure(StateMachineExecutor executor) {
+        // Listeners
+        executor.register(AtomixConsistentMapCommands.Listen.class,
+                this::listen);
+        executor.register(AtomixConsistentMapCommands.Unlisten.class,
+                this::unlisten);
+        // Queries
+        executor.register(AtomixConsistentMapCommands.ContainsKey.class,
+                this::containsKey);
+        executor.register(AtomixConsistentMapCommands.ContainsValue.class,
+                this::containsValue);
+        executor.register(AtomixConsistentMapCommands.EntrySet.class,
+                this::entrySet);
+        executor.register(AtomixConsistentMapCommands.Get.class, this::get);
+        executor.register(AtomixConsistentMapCommands.IsEmpty.class,
+                this::isEmpty);
+        executor.register(AtomixConsistentMapCommands.KeySet.class,
+                this::keySet);
+        executor.register(AtomixConsistentMapCommands.Size.class, this::size);
+        executor.register(AtomixConsistentMapCommands.Values.class,
+                this::values);
+        // Commands
+        executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
+                this::updateAndGet);
+        executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
+        executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
+                this::prepare);
+        executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
+                this::commit);
+        executor.register(
+                AtomixConsistentMapCommands.TransactionRollback.class,
+                this::rollback);
+    }
+
+    @Override
+    public void delete() {
+        // Delete Listeners
+        listeners.values().forEach(Commit::close);
+        listeners.clear();
+
+        // Delete Map entries
+        mapEntries.values().forEach(MapEntryValue::discard);
+        mapEntries.clear();
+    }
+
+    /**
+     * Handles a contains key commit.
+     *
+     * @param commit
+     *            containsKey commit
+     * @return {@code true} if map contains key
+     */
+    protected boolean containsKey(
+            Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
+        try {
+            return toVersioned(mapEntries.get(commit.operation().key())) != null;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a contains value commit.
+     *
+     * @param commit
+     *            containsValue commit
+     * @return {@code true} if map contains value
+     */
+    protected boolean containsValue(
+            Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
+        try {
+            Match<byte[]> valueMatch = Match
+                    .ifValue(commit.operation().value());
+            return mapEntries.values().stream()
+                    .anyMatch(value -> valueMatch.matches(value.value()));
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a get commit.
+     *
+     * @param commit
+     *            get commit
+     * @return value mapped to key
+     */
+    protected Versioned<byte[]> get(
+            Commit<? extends AtomixConsistentMapCommands.Get> commit) {
+        try {
+            return toVersioned(mapEntries.get(commit.operation().key()));
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a count commit.
+     *
+     * @param commit
+     *            size commit
+     * @return number of entries in map
+     */
+    protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
+        try {
+            return mapEntries.size();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles an is empty commit.
+     *
+     * @param commit
+     *            isEmpty commit
+     * @return {@code true} if map is empty
+     */
+    protected boolean isEmpty(
+            Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
+        try {
+            return mapEntries.isEmpty();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a keySet commit.
+     *
+     * @param commit
+     *            keySet commit
+     * @return set of keys in map
+     */
+    protected Set<String> keySet(
+            Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
+        try {
+            return mapEntries.keySet();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a values commit.
+     *
+     * @param commit
+     *            values commit
+     * @return collection of values in map
+     */
+    protected Collection<Versioned<byte[]>> values(
+            Commit<? extends AtomixConsistentMapCommands.Values> commit) {
+        try {
+            return mapEntries.values().stream().map(this::toVersioned)
+                    .collect(Collectors.toList());
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a entry set commit.
+     *
+     * @param commit
+     *            entrySet commit
+     * @return set of map entries
+     */
+    protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
+            Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
+        try {
+            return mapEntries
+                    .entrySet()
+                    .stream()
+                    .map(e -> Maps.immutableEntry(e.getKey(),
+                            toVersioned(e.getValue())))
+                    .collect(Collectors.toSet());
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a update and get commit.
+     *
+     * @param commit
+     *            updateAndGet commit
+     * @return update result
+     */
+    protected MapEntryUpdateResult<String, byte[]> updateAndGet(
+            Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+        MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
+        String key = commit.operation().key();
+        MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
+        Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
+
+        if (updateStatus != MapEntryUpdateResult.Status.OK) {
+            commit.close();
+            return new MapEntryUpdateResult<>(updateStatus, "", key,
+                    oldMapValue, oldMapValue);
+        }
+
+        byte[] newValue = commit.operation().value();
+        long newVersion = versionCounter.incrementAndGet();
+        Versioned<byte[]> newMapValue = newValue == null ? null
+                : new Versioned<>(newValue, newVersion);
+
+        MapEvent.Type updateType = newValue == null ? REMOVE
+                : oldCommitValue == null ? INSERT : UPDATE;
+        if (updateType == REMOVE || updateType == UPDATE) {
+            mapEntries.remove(key);
+            oldCommitValue.discard();
+        }
+        if (updateType == INSERT || updateType == UPDATE) {
+            mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
+        }
+        notify(new MapEvent<>("", key, newMapValue, oldMapValue));
+        return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
+                newMapValue);
+    }
+
+    /**
+     * Handles a clear commit.
+     *
+     * @param commit
+     *            clear commit
+     * @return clear result
+     */
+    protected MapEntryUpdateResult.Status clear(
+            Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
+        try {
+            Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
+                    .entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, MapEntryValue> entry = iterator.next();
+                String key = entry.getKey();
+                MapEntryValue value = entry.getValue();
+                Versioned<byte[]> removedValue = new Versioned<>(value.value(),
+                        value.version());
+                notify(new MapEvent<>("", key, null, removedValue));
+                value.discard();
+                iterator.remove();
+            }
+            return MapEntryUpdateResult.Status.OK;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles a listen commit.
+     *
+     * @param commit
+     *            listen commit
+     */
+    protected void listen(
+            Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
+        Long sessionId = commit.session().id();
+        listeners.put(sessionId, commit);
+        commit.session()
+                .onStateChange(
+                        state -> {
+                            if (state == Session.State.CLOSED
+                                    || state == Session.State.EXPIRED) {
+                                Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
+                                        .remove(sessionId);
+                                if (listener != null) {
+                                    listener.close();
+                                }
+                            }
+                        });
+    }
+
+    /**
+     * Handles an unlisten commit.
+     *
+     * @param commit
+     *            unlisten commit
+     */
+    protected void unlisten(
+            Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
+        try {
+            Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
+                    .remove(commit.session());
+            if (listener != null) {
+                listener.close();
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Triggers a change event.
+     *
+     * @param value
+     *            map event
+     */
+    private void notify(MapEvent<String, byte[]> value) {
+        listeners.values().forEach(
+                commit -> commit.session().publish("change", value));
+    }
+
+    /**
+     * Handles an prepare commit.
+     *
+     * @param commit
+     *            transaction prepare commit
+     * @return prepare result
+     */
+    protected PrepareResult prepare(
+            Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
+        boolean ok = false;
+        try {
+            TransactionalMapUpdate<String, byte[]> transactionUpdate = commit
+                    .operation().transactionUpdate();
+            for (MapUpdate<String, byte[]> update : transactionUpdate.batch()) {
+                String key = update.key();
+                if (preparedKeys.contains(key)) {
+                    return PrepareResult.CONCURRENT_TRANSACTION;
+                }
+                MapEntryValue existingValue = mapEntries.get(key);
+                if (existingValue == null) {
+                    if (update.currentValue() != null) {
+                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+                    }
+                } else {
+                    if (existingValue.version() != update.currentVersion()) {
+                        return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
+                    }
+                }
+            }
+            // No violations detected. Add to pendingTranctions and mark
+            // modified keys as
+            // currently locked to updates.
+            pendingTransactions.put(transactionUpdate.transactionId(), commit);
+            transactionUpdate.batch().forEach(u -> preparedKeys.add(u.key()));
+            ok = true;
+            return PrepareResult.OK;
+        } finally {
+            if (!ok) {
+                commit.close();
+            }
+        }
+    }
+
+    /**
+     * Handles an commit commit (ha!).
+     *
+     * @param commit transaction commit commit
+     * @return commit result
+     */
+    protected CommitResult commit(
+            Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
+        TransactionId transactionId = commit.operation().transactionId();
+        try {
+            Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
+                    .remove(transactionId);
+            if (prepareCommit == null) {
+                return CommitResult.UNKNOWN_TRANSACTION_ID;
+            }
+            TransactionalMapUpdate<String, byte[]> transactionalUpdate = prepareCommit
+                    .operation().transactionUpdate();
+            long totalReferencesToCommit = transactionalUpdate
+                    .batch()
+                    .stream()
+                    .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+                    .count();
+            CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
+                    new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
+            for (MapUpdate<String, byte[]> update : transactionalUpdate.batch()) {
+                String key = update.key();
+                MapEntryValue previousValue = mapEntries.remove(key);
+                MapEntryValue newValue = null;
+                checkState(preparedKeys.remove(key), "key is not prepared");
+                if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
+                    newValue = new TransactionalCommit(key,
+                            versionCounter.incrementAndGet(), completer);
+                }
+                mapEntries.put(key, newValue);
+                // Notify map listeners
+                notify(new MapEvent<>("", key, toVersioned(newValue),
+                        toVersioned(previousValue)));
+            }
+            return CommitResult.OK;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Handles an rollback commit (ha!).
+     *
+     * @param commit transaction rollback commit
+     * @return rollback result
+     */
+    protected RollbackResult rollback(
+            Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
+        TransactionId transactionId = commit.operation().transactionId();
+        try {
+            Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
+                    .remove(transactionId);
+            if (prepareCommit == null) {
+                return RollbackResult.UNKNOWN_TRANSACTION_ID;
+            } else {
+                prepareCommit.operation().transactionUpdate().batch()
+                        .forEach(u -> preparedKeys.remove(u.key()));
+                prepareCommit.close();
+                return RollbackResult.OK;
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    private MapEntryUpdateResult.Status validate(
+            AtomixConsistentMapCommands.UpdateAndGet update) {
+        MapEntryValue existingValue = mapEntries.get(update.key());
+        if (existingValue == null && update.value() == null) {
+            return MapEntryUpdateResult.Status.NOOP;
+        }
+        if (preparedKeys.contains(update.key())) {
+            return MapEntryUpdateResult.Status.WRITE_LOCK;
+        }
+        byte[] existingRawValue = existingValue == null ? null : existingValue
+                .value();
+        Long existingVersion = existingValue == null ? null : existingValue
+                .version();
+        return update.valueMatch().matches(existingRawValue)
+                && update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
+                : MapEntryUpdateResult.Status.PRECONDITION_FAILED;
+    }
+
+    private Versioned<byte[]> toVersioned(MapEntryValue value) {
+        return value == null ? null : new Versioned<>(value.value(),
+                value.version());
+    }
+
+    @Override
+    public void register(Session session) {
+    }
+
+    @Override
+    public void unregister(Session session) {
+        closeListener(session.id());
+    }
+
+    @Override
+    public void expire(Session session) {
+        closeListener(session.id());
+    }
+
+    @Override
+    public void close(Session session) {
+        closeListener(session.id());
+    }
+
+    private void closeListener(Long sessionId) {
+        Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
+                .remove(sessionId);
+        if (commit != null) {
+            commit.close();
+        }
+    }
+
+    /**
+     * Interface implemented by map values.
+     */
+    private interface MapEntryValue {
+        /**
+         * Returns the raw {@code byte[]}.
+         *
+         * @return raw value
+         */
+        byte[] value();
+
+        /**
+         * Returns the version of the value.
+         *
+         * @return version
+         */
+        long version();
+
+        /**
+         * Discards the value by invoke appropriate clean up actions.
+         */
+        void discard();
+    }
+
+    /**
+     * A {@code MapEntryValue} that is derived from a non-transactional update
+     * i.e. via any standard map update operation.
+     */
+    private class NonTransactionalCommit implements MapEntryValue {
+        private final long version;
+        private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
+
+        public NonTransactionalCommit(
+                long version,
+                Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
+            this.version = version;
+            this.commit = commit;
+        }
+
+        @Override
+        public byte[] value() {
+            return commit.operation().value();
+        }
+
+        @Override
+        public long version() {
+            return version;
+        }
+
+        @Override
+        public void discard() {
+            commit.close();
+        }
+    }
+
+    /**
+     * A {@code MapEntryValue} that is derived from updates submitted via a
+     * transaction.
+     */
+    private class TransactionalCommit implements MapEntryValue {
+        private final String key;
+        private final long version;
+        private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
+
+        public TransactionalCommit(
+                String key,
+                long version,
+                CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
+            this.key = key;
+            this.version = version;
+            this.completer = commit;
+        }
+
+        @Override
+        public byte[] value() {
+            TransactionalMapUpdate<String, byte[]> update = completer.object()
+                    .operation().transactionUpdate();
+            return update.valueForKey(key);
+        }
+
+        @Override
+        public long version() {
+            return version;
+        }
+
+        @Override
+        public void discard() {
+            completer.countDown();
+        }
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixCounter.java
new file mode 100644
index 0000000..34ae507
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixCounter.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.variables.DistributedLong;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.service.AsyncAtomicCounter;
+
+/**
+ * {@code AsyncAtomicCounter} implementation backed by Atomix
+ * {@link DistributedLong}.
+ */
+public class AtomixCounter implements AsyncAtomicCounter {
+
+    private final String name;
+    private final DistributedLong distLong;
+
+    public AtomixCounter(String name, DistributedLong distLong) {
+        this.name = name;
+        this.distLong = distLong;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public CompletableFuture<Long> incrementAndGet() {
+        return distLong.incrementAndGet();
+    }
+
+    @Override
+    public CompletableFuture<Long> getAndIncrement() {
+        return distLong.getAndIncrement();
+    }
+
+    @Override
+    public CompletableFuture<Long> getAndAdd(long delta) {
+        return distLong.getAndAdd(delta);
+    }
+
+    @Override
+    public CompletableFuture<Long> addAndGet(long delta) {
+        return distLong.addAndGet(delta);
+    }
+
+    @Override
+    public CompletableFuture<Long> get() {
+        return distLong.get();
+    }
+
+    @Override
+    public CompletableFuture<Void> set(long value) {
+        return distLong.set(value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> compareAndSet(long expectedValue,
+            long updateValue) {
+        return distLong.compareAndSet(expectedValue, updateValue);
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
new file mode 100644
index 0000000..b18d3da
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.util.Listener;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.Consistency;
+import io.atomix.resource.Resource;
+import io.atomix.resource.ResourceTypeInfo;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.service.AsyncLeaderElector;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed resource providing the {@link AsyncLeaderElector} primitive.
+ */
+@ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
+public class AtomixLeaderElector
+    extends Resource<AtomixLeaderElector, Resource.Options> implements AsyncLeaderElector {
+    private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
+            Sets.newConcurrentHashSet();
+
+    private Listener<Change<Leadership>> listener;
+
+    public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
+        super(client, options);
+    }
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<AtomixLeaderElector> open() {
+        return super.open().thenApply(result -> {
+            client.session().onEvent("change", this::handleEvent);
+            return result;
+        });
+    }
+
+    private void handleEvent(Change<Leadership> change) {
+        leadershipChangeListeners.forEach(l -> l.accept(change));
+    }
+
+    @Override
+    public AtomixLeaderElector with(Consistency consistency) {
+        super.with(consistency);
+        return this;
+    }
+
+    @Override
+    public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
+        return submit(new AtomixLeaderElectorCommands.Run(topic, nodeId));
+    }
+
+    @Override
+    public CompletableFuture<Void> withdraw(String topic) {
+        return submit(new AtomixLeaderElectorCommands.Withdraw(topic));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
+        return submit(new AtomixLeaderElectorCommands.Anoint(topic, nodeId));
+    }
+
+    @Override
+    public CompletableFuture<Leadership> getLeadership(String topic) {
+        return submit(new AtomixLeaderElectorCommands.GetLeadership(topic));
+    }
+
+    @Override
+    public CompletableFuture<Map<String, Leadership>> getLeaderships() {
+        return submit(new AtomixLeaderElectorCommands.GetAllLeaderships());
+    }
+
+    public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
+        return submit(new AtomixLeaderElectorCommands.GetElectedTopics(nodeId));
+    }
+
+    /**
+     * Leadership change listener context.
+     */
+    private final class LeadershipChangeListener implements Listener<Change<Leadership>> {
+        private final Consumer<Change<Leadership>> listener;
+
+        private LeadershipChangeListener(Consumer<Change<Leadership>> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void accept(Change<Leadership> change) {
+            listener.accept(change);
+        }
+
+        @Override
+        public void close() {
+            synchronized (AtomixLeaderElector.this) {
+                submit(new AtomixLeaderElectorCommands.Unlisten());
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
+        leadershipChangeListeners.add(consumer);
+        return setupListener();
+    }
+
+    @Override
+    public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
+        leadershipChangeListeners.remove(consumer);
+        return teardownListener();
+    }
+
+    private CompletableFuture<Void> setupListener() {
+        if (listener == null && !leadershipChangeListeners.isEmpty()) {
+            Consumer<Change<Leadership>> changeConsumer = change -> {
+                leadershipChangeListeners.forEach(consumer -> consumer.accept(change));
+            };
+            return submit(new AtomixLeaderElectorCommands.Listen())
+                    .thenAccept(v -> listener = new LeadershipChangeListener(changeConsumer));
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private CompletableFuture<Void> teardownListener() {
+        if (listener != null && leadershipChangeListeners.isEmpty()) {
+            listener.close();
+            listener = null;
+            return submit(new AtomixLeaderElectorCommands.Unlisten());
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
new file mode 100644
index 0000000..16f1769
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -0,0 +1,310 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Strings;
+
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.copycat.client.Command;
+import io.atomix.copycat.client.Query;
+
+/**
+ * {@link AtomixLeaderElector} resource state machine operations.
+ */
+public final class AtomixLeaderElectorCommands {
+
+    private AtomixLeaderElectorCommands() {
+    }
+
+    /**
+     * Abstract election query.
+     */
+    @SuppressWarnings("serial")
+    public abstract static class ElectionQuery<V> implements Query<V>, CatalystSerializable {
+
+        @Override
+        public ConsistencyLevel consistency() {
+            return ConsistencyLevel.BOUNDED_LINEARIZABLE;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+    }
+
+    /**
+     * Abstract election topic query.
+     */
+    @SuppressWarnings("serial")
+    public abstract static class TopicQuery<V> extends ElectionQuery<V> implements CatalystSerializable {
+        String topic;
+
+        public TopicQuery() {
+        }
+
+        public TopicQuery(String topic) {
+          this.topic = Assert.notNull(topic, "topic");
+        }
+
+        /**
+         * Returns the topic.
+         * @return topic
+         */
+        public String topic() {
+          return topic;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+            serializer.writeObject(topic, buffer);
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+          topic = serializer.readObject(buffer);
+        }
+    }
+
+    /**
+     * Abstract election command.
+     */
+    @SuppressWarnings("serial")
+    public abstract static class ElectionCommand<V> implements Command<V>, CatalystSerializable {
+
+        @Override
+        public ConsistencyLevel consistency() {
+            return ConsistencyLevel.LINEARIZABLE;
+        }
+
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+    }
+
+    /**
+     * Listen command.
+     */
+    @SuppressWarnings("serial")
+    public static class Listen extends ElectionCommand<Void> {
+    }
+
+    /**
+     * Unlisten command.
+     */
+    @SuppressWarnings("serial")
+    public static class Unlisten extends ElectionCommand<Void> {
+
+        @Override
+        public CompactionMode compaction() {
+            return CompactionMode.QUORUM;
+        }
+    }
+
+    /**
+     * GetLeader query.
+     */
+    @SuppressWarnings("serial")
+    public static class GetLeadership extends TopicQuery<Leadership> {
+
+        public GetLeadership() {
+        }
+
+        public GetLeadership(String topic) {
+            super(topic);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("topic", topic)
+                    .toString();
+        }
+    }
+
+    /**
+     * GetAllLeaders query.
+     */
+    @SuppressWarnings("serial")
+    public static class GetAllLeaderships extends ElectionQuery<Map<String, Leadership>> {
+    }
+
+    /**
+     * GetElectedTopics query.
+     */
+    @SuppressWarnings("serial")
+    public static class GetElectedTopics extends ElectionQuery<Set<String>> {
+        private NodeId nodeId;
+
+        public GetElectedTopics() {
+        }
+
+        public GetElectedTopics(NodeId nodeId) {
+            this.nodeId = Assert.argNot(nodeId, nodeId == null, "nodeId cannot be null");
+        }
+
+        /**
+         * Returns the nodeId to check.
+         *
+         * @return The nodeId to check.
+         */
+        public NodeId nodeId() {
+            return nodeId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("nodeId", nodeId)
+                    .toString();
+        }
+    }
+
+    /**
+     * Enter and run for leadership.
+     */
+    @SuppressWarnings("serial")
+    public static class Run extends ElectionCommand<Leadership> {
+        private String topic;
+        private NodeId nodeId;
+
+        public Run() {
+        }
+
+        public Run(String topic, NodeId nodeId) {
+            this.topic = Assert.argNot(topic, Strings.isNullOrEmpty(topic), "topic cannot be null or empty");
+            this.nodeId = Assert.argNot(nodeId, nodeId == null, "nodeId cannot be null");
+        }
+
+        /**
+         * Returns the topic.
+         *
+         * @return topic
+         */
+        public String topic() {
+            return topic;
+        }
+
+        /**
+         * Returns the nodeId.
+         *
+         * @return the nodeId
+         */
+        public NodeId nodeId() {
+            return nodeId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("topic", topic)
+                    .add("nodeId", nodeId)
+                    .toString();
+        }
+    }
+
+    /**
+     * Withdraw from a leadership contest.
+     */
+    @SuppressWarnings("serial")
+    public static class Withdraw extends ElectionCommand<Void> {
+        private String topic;
+
+        public Withdraw() {
+        }
+
+        public Withdraw(String topic) {
+            this.topic = Assert.argNot(topic, Strings.isNullOrEmpty(topic), "topic cannot be null or empty");
+        }
+
+        /**
+         * Returns the topic.
+         *
+         * @return The topic
+         */
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("topic", topic)
+                    .toString();
+        }
+    }
+
+    /**
+     * Command for administratively anointing a node as leader.
+     */
+    @SuppressWarnings("serial")
+    public static class Anoint extends ElectionCommand<Boolean> {
+        private String topic;
+        private NodeId nodeId;
+
+        public Anoint() {
+        }
+
+        public Anoint(String topic, NodeId nodeId) {
+            this.topic = topic;
+            this.nodeId = nodeId;
+        }
+
+        /**
+         * Returns the topic.
+         *
+         * @return The topic
+         */
+        public String topic() {
+            return topic;
+        }
+
+        /**
+         * Returns the nodeId to make leader.
+         *
+         * @return The nodeId
+         */
+        public NodeId nodeId() {
+            return nodeId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("topic", topic)
+                    .add("nodeId", nodeId)
+                    .toString();
+        }
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
new file mode 100644
index 0000000..2ae6e68
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -0,0 +1,445 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.client.session.Session;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * State machine for {@link AtomixLeaderElector} resource.
+ */
+public class AtomixLeaderElectorState extends ResourceStateMachine
+    implements SessionListener, Snapshottable {
+
+    private final Logger log = getLogger(getClass());
+    private Map<String, AtomicLong> termCounters = new HashMap<>();
+    private Map<String, ElectionState> elections = new HashMap<>();
+    private final Map<Long, Commit<? extends AtomixLeaderElectorCommands.Listen>> listeners = new LinkedHashMap<>();
+    private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
+                                                           ElectionState.class,
+                                                           Registration.class);
+
+    @Override
+    protected void configure(StateMachineExecutor executor) {
+        // Notification
+        executor.register(AtomixLeaderElectorCommands.Listen.class, this::listen);
+        executor.register(AtomixLeaderElectorCommands.Unlisten.class, this::unlisten);
+        // Commands
+        executor.register(AtomixLeaderElectorCommands.Run.class, this::run);
+        executor.register(AtomixLeaderElectorCommands.Withdraw.class, this::withdraw);
+        executor.register(AtomixLeaderElectorCommands.Anoint.class, this::anoint);
+        // Queries
+        executor.register(AtomixLeaderElectorCommands.GetLeadership.class, this::leadership);
+        executor.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, this::allLeaderships);
+        executor.register(AtomixLeaderElectorCommands.GetElectedTopics.class, this::electedTopics);
+    }
+
+    private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
+        Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
+        listeners.values().forEach(listener -> listener.session().publish("change", change));
+    }
+
+    @Override
+    public void delete() {
+      // Close and clear Listeners
+      listeners.values().forEach(Commit::close);
+      listeners.clear();
+    }
+
+    /**
+     * Applies listen commits.
+     *
+     * @param commit listen commit
+     */
+    public void listen(Commit<? extends AtomixLeaderElectorCommands.Listen> commit) {
+        if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
+            commit.close();
+        }
+    }
+
+    /**
+     * Applies unlisten commits.
+     *
+     * @param commit unlisten commit
+     */
+    public void unlisten(Commit<? extends AtomixLeaderElectorCommands.Unlisten> commit) {
+        try {
+            Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(commit.session().id());
+            if (listener != null) {
+                listener.close();
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Applies an {@link AtomixLeaderElectorCommands.Run} commit.
+     * @param commit commit entry
+     * @return topic leader. If no previous leader existed this is the node that just entered the race.
+     */
+    public Leadership run(Commit<? extends AtomixLeaderElectorCommands.Run> commit) {
+        try {
+            String topic = commit.operation().topic();
+            Leadership oldLeadership = leadership(topic);
+            Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
+            elections.compute(topic, (k, v) -> {
+                if (v == null) {
+                    return new ElectionState(registration, termCounter(topic)::incrementAndGet);
+                } else {
+                    if (!v.isDuplicate(registration)) {
+                        return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
+                    } else {
+                        return v;
+                    }
+                }
+            });
+            Leadership newLeadership = leadership(topic);
+
+            if (!Objects.equal(oldLeadership, newLeadership)) {
+                notifyLeadershipChange(oldLeadership, newLeadership);
+            }
+            return newLeadership;
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
+     * @param commit withdraw commit
+     */
+    public void withdraw(Commit<? extends AtomixLeaderElectorCommands.Withdraw> commit) {
+        try {
+            String topic = commit.operation().topic();
+            Leadership oldLeadership = leadership(topic);
+            elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
+                                        termCounter(topic)::incrementAndGet));
+            Leadership newLeadership = leadership(topic);
+            if (!Objects.equal(oldLeadership, newLeadership)) {
+                notifyLeadershipChange(oldLeadership, newLeadership);
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
+     * @param commit anoint commit
+     * @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
+     */
+    public boolean anoint(Commit<? extends AtomixLeaderElectorCommands.Anoint> commit) {
+        try {
+            String topic = commit.operation().topic();
+            Leadership oldLeadership = leadership(topic);
+            ElectionState electionState = elections.computeIfPresent(topic,
+                    (k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic)));
+            Leadership newLeadership = leadership(topic);
+            if (!Objects.equal(oldLeadership, newLeadership)) {
+                notifyLeadershipChange(oldLeadership, newLeadership);
+            }
+            return (electionState != null &&
+                    electionState.leader() != null &&
+                    commit.operation().nodeId().equals(electionState.leader().nodeId()));
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
+     * @param commit GetLeadership commit
+     * @return leader
+     */
+    public Leadership leadership(Commit<? extends AtomixLeaderElectorCommands.GetLeadership> commit) {
+        String topic = commit.operation().topic();
+        try {
+            return leadership(topic);
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
+     * @param commit commit entry
+     * @return set of topics for which the node is the leader
+     */
+    public Set<String> electedTopics(Commit<? extends AtomixLeaderElectorCommands.GetElectedTopics> commit) {
+        try {
+            NodeId nodeId = commit.operation().nodeId();
+            return Maps.filterEntries(elections, e -> {
+                Leader leader = leadership(e.getKey()).leader();
+                return leader != null && leader.nodeId().equals(nodeId);
+            }).keySet();
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
+     * @param commit GetAllLeaderships commit
+     * @return topic to leader mapping
+     */
+    public Map<String, Leadership> allLeaderships(
+            Commit<? extends AtomixLeaderElectorCommands.GetAllLeaderships> commit) {
+        try {
+            return Maps.transformEntries(elections, (k, v) -> leadership(k));
+        } finally {
+            commit.close();
+        }
+    }
+
+    private Leadership leadership(String topic) {
+        return new Leadership(topic,
+                leader(topic),
+                candidates(topic));
+    }
+
+    private Leader leader(String topic) {
+        ElectionState electionState = elections.get(topic);
+        return electionState == null ? null : electionState.leader();
+    }
+
+    private List<NodeId> candidates(String topic) {
+        ElectionState electionState = elections.get(topic);
+        return electionState == null ? new LinkedList<>() : electionState.candidates();
+    }
+
+    private void onSessionEnd(Session session) {
+        Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
+        if (listener != null) {
+            listener.close();
+        }
+        Set<String> topics = elections.keySet();
+        topics.forEach(topic -> {
+            Leadership oldLeadership = leadership(topic);
+            elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
+            Leadership newLeadership = leadership(topic);
+            if (!Objects.equal(oldLeadership, newLeadership)) {
+                notifyLeadershipChange(oldLeadership, newLeadership);
+            }
+        });
+    }
+
+    private static class Registration {
+        private final NodeId nodeId;
+        private final long sessionId;
+
+        public Registration(NodeId nodeId, long sessionId) {
+            this.nodeId = nodeId;
+            this.sessionId = sessionId;
+        }
+
+        public NodeId nodeId() {
+            return nodeId;
+        }
+
+        public long sessionId() {
+            return sessionId;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("nodeId", nodeId)
+                    .add("sessionId", sessionId)
+                    .toString();
+        }
+    }
+
+    private static class ElectionState {
+        final Registration leader;
+        final long term;
+        final long termStartTime;
+        final List<Registration> registrations;
+
+        public ElectionState(Registration registration, Supplier<Long> termCounter) {
+            registrations = Arrays.asList(registration);
+            term = termCounter.get();
+            termStartTime = System.currentTimeMillis();
+            leader = registration;
+        }
+
+        public ElectionState(ElectionState other) {
+            registrations = Lists.newArrayList(other.registrations);
+            leader = other.leader;
+            term = other.term;
+            termStartTime = other.termStartTime;
+        }
+
+        public ElectionState(List<Registration> registrations,
+                Registration leader,
+                long term,
+                long termStartTime) {
+            this.registrations = Lists.newArrayList(registrations);
+            this.leader = leader;
+            this.term = term;
+            this.termStartTime = termStartTime;
+        }
+
+        public ElectionState cleanup(Session session, Supplier<Long> termCounter) {
+            Optional<Registration> registration =
+                    registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
+            if (registration.isPresent()) {
+                List<Registration> updatedRegistrations =
+                        registrations.stream()
+                        .filter(r -> r.sessionId() != session.id())
+                        .collect(Collectors.toList());
+                if (leader.sessionId() == session.id()) {
+                    if (updatedRegistrations.size() > 0) {
+                        return new ElectionState(updatedRegistrations,
+                                updatedRegistrations.get(0),
+                                termCounter.get(),
+                                System.currentTimeMillis());
+                    } else {
+                        return new ElectionState(updatedRegistrations, null, term, termStartTime);
+                    }
+                } else {
+                    return new ElectionState(updatedRegistrations, leader, term, termStartTime);
+                }
+            } else {
+                return this;
+            }
+        }
+
+        public boolean isDuplicate(Registration registration) {
+            return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
+        }
+
+        public Leader leader() {
+            if (leader == null) {
+                return null;
+            } else {
+                NodeId leaderNodeId = leader.nodeId();
+                return new Leader(leaderNodeId, term, termStartTime);
+            }
+        }
+
+        public List<NodeId> candidates() {
+            return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
+        }
+
+        public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
+            if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
+                List<Registration> updatedRegistrations = new LinkedList<>(registrations);
+                updatedRegistrations.add(registration);
+                boolean newLeader = leader == null;
+                return new ElectionState(updatedRegistrations,
+                        newLeader ? registration : leader,
+                        newLeader ? termCounter.get() : term,
+                        newLeader ? System.currentTimeMillis() : termStartTime);
+            }
+            return this;
+        }
+
+        public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
+            Registration newLeader = registrations.stream()
+                                                  .filter(r -> r.nodeId().equals(nodeId))
+                                                  .findFirst()
+                                                  .orElse(null);
+            if (newLeader != null) {
+                return new ElectionState(registrations,
+                                         newLeader,
+                                         termCounter.incrementAndGet(),
+                                         System.currentTimeMillis());
+            } else {
+                return this;
+            }
+        }
+    }
+
+    @Override
+    public void register(Session session) {
+    }
+
+    @Override
+    public void unregister(Session session) {
+        onSessionEnd(session);
+    }
+
+    @Override
+    public void expire(Session session) {
+        onSessionEnd(session);
+    }
+
+    @Override
+    public void close(Session session) {
+        onSessionEnd(session);
+    }
+
+    @Override
+    public void snapshot(SnapshotWriter writer) {
+        byte[] encodedTermCounters = serializer.encode(termCounters);
+        writer.writeInt(encodedTermCounters.length);
+        writer.write(encodedTermCounters);
+        byte[] encodedElections  = serializer.encode(elections);
+        writer.writeInt(encodedElections.length);
+        writer.write(encodedElections);
+        log.info("Took state machine snapshot");
+    }
+
+    @Override
+    public void install(SnapshotReader reader) {
+        int encodedTermCountersSize = reader.readInt();
+        byte[] encodedTermCounters = new byte[encodedTermCountersSize];
+        reader.read(encodedTermCounters);
+        termCounters = serializer.decode(encodedTermCounters);
+        int encodedElectionsSize = reader.readInt();
+        byte[] encodedElections = new byte[encodedElectionsSize];
+        reader.read(encodedElections);
+        elections = serializer.decode(encodedElections);
+        log.info("Reinstated state machine from snapshot");
+    }
+
+    private AtomicLong termCounter(String topic) {
+        return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixValue.java
new file mode 100644
index 0000000..155fe01
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixValue.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AtomicValueEvent;
+import org.onosproject.store.service.AtomicValueEventListener;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@link AsyncAtomicValue} backed by {@link AtomixConsistentMap}.
+ */
+public class AtomixValue implements AsyncAtomicValue<String> {
+
+    private final String name;
+    private final AtomixConsistentMap atomixMap;
+    private MapEventListener<String, byte[]> mapEventListener;
+    private final Set<AtomicValueEventListener<String>> listeners = Sets.newIdentityHashSet();
+
+    AtomixValue(String name, AtomixConsistentMap atomixMap) {
+        this.name = name;
+        this.atomixMap = atomixMap;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> compareAndSet(String expect, String update) {
+        return atomixMap.replace(name, Tools.getBytesUtf8(expect), Tools.getBytesUtf8(update));
+    }
+
+    @Override
+    public CompletableFuture<String> get() {
+        return atomixMap.get(name)
+                        .thenApply(v -> v != null ? Tools.toStringUtf8(v.value()) : null);
+    }
+
+    @Override
+    public CompletableFuture<String> getAndSet(String value) {
+        return atomixMap.put(name, Tools.getBytesUtf8(value))
+                        .thenApply(v -> v != null ? Tools.toStringUtf8(v.value()) : null);
+    }
+
+    @Override
+    public CompletableFuture<Void> set(String value) {
+        return getAndSet(value).thenApply(v -> null);
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(AtomicValueEventListener<String> listener) {
+        // TODO: synchronization
+        if (mapEventListener == null) {
+            mapEventListener = event -> {
+                Versioned<byte[]> newValue = event.newValue();
+                Versioned<byte[]> oldValue = event.oldValue();
+                if (Objects.equals(event.key(), name)) {
+                    listener.event(new AtomicValueEvent<>(name,
+                            newValue == null ? null : Tools.toStringUtf8(newValue.value()),
+                            oldValue == null ? null : Tools.toStringUtf8(oldValue.value())));
+                }
+            };
+            return atomixMap.addListener(mapEventListener).whenComplete((r, e) -> {
+                if (e == null) {
+                    listeners.add(listener);
+                } else {
+                    mapEventListener = null;
+                }
+            });
+        } else {
+            listeners.add(listener);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(AtomicValueEventListener<String> listener) {
+        // TODO: synchronization
+        listeners.remove(listener);
+        if (listeners.isEmpty()) {
+            return atomixMap.removeListener(mapEventListener);
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    @Override
+    public String name() {
+        return null;
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java
new file mode 100644
index 0000000..2b43f83
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/CommitResult.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+/**
+ * Response enum for two phase commit operation.
+ */
+public enum CommitResult {
+    /**
+     * Signifies a successful commit execution.
+     */
+    OK,
+
+    /**
+     * Signifies a failure due to unrecognized transaction identifier.
+     */
+    UNKNOWN_TRANSACTION_ID,
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
new file mode 100644
index 0000000..7858595
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapEntryUpdateResult.java
@@ -0,0 +1,157 @@
+
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.function.Function;
+
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Result of a map entry update operation.
+ * <p>
+ * Both old and new values are accessible along with a flag that indicates if the
+ * the value was updated. If flag is false, oldValue and newValue both
+ * point to the same unmodified value.
+ * @param <V> result type
+ */
+public class MapEntryUpdateResult<K, V> {
+
+    public enum Status {
+
+        /**
+         * Indicates a successful update.
+         */
+        OK,
+
+        /**
+         * Indicates a noop i.e. existing and new value are both null.
+         */
+        NOOP,
+
+        /**
+         * Indicates a failed update due to a write lock.
+         */
+        WRITE_LOCK,
+
+        /**
+         * Indicates a failed update due to a precondition check failure.
+         */
+        PRECONDITION_FAILED
+    }
+
+    private final String mapName;
+    private Status status;
+    private final K key;
+    private final Versioned<V> oldValue;
+    private final Versioned<V> newValue;
+
+    public MapEntryUpdateResult(Status status, String mapName, K key, Versioned<V> oldValue, Versioned<V> newValue) {
+        this.status = status;
+        this.mapName = mapName;
+        this.key = key;
+        this.oldValue = oldValue;
+        this.newValue = newValue;
+    }
+
+    /**
+     * Returns {@code true} if the update was successful.
+     * @return {@code true} if yes, {@code false} otherwise
+     */
+    public boolean updated() {
+        return status == Status.OK;
+    }
+
+    /**
+     * Returns the map name.
+     * @return map name
+     */
+    public String mapName() {
+        return mapName;
+    }
+
+    /**
+     * Returns the update status.
+     * @return update status
+     */
+    public Status status() {
+        return status;
+    }
+
+    /**
+     * Returns the map key.
+     * @return key
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * Returns the old value.
+     * @return the previous value associated with key if updated was successful, otherwise current value
+     */
+    public Versioned<V> oldValue() {
+        return oldValue;
+    }
+
+    /**
+     * Returns the new value after update.
+     * @return if updated was unsuccessful, this is same as old value
+     */
+    public Versioned<V> newValue() {
+        return newValue;
+    }
+
+    /**
+     * Maps to another instance with different key and value types.
+     * @param keyTransform transformer to use for transcoding keys
+     * @param valueMapper mapper to use for transcoding values
+     * @return new instance
+     */
+    public <K1, V1> MapEntryUpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
+        return new MapEntryUpdateResult<>(status,
+                mapName,
+                keyTransform.apply(key),
+                oldValue == null ? null : oldValue.map(valueMapper),
+                newValue == null ? null : newValue.map(valueMapper));
+    }
+
+    /**
+     * Return the map event that will be generated as a result of this update.
+     * @return map event. if update was unsuccessful, this returns {@code null}
+     */
+    public MapEvent<K, V> toMapEvent() {
+        if (!updated()) {
+            return null;
+        } else {
+            return new MapEvent<>(mapName(), key(), newValue, oldValue);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(MapEntryUpdateResult.class)
+                .add("mapName", mapName)
+                .add("status", status)
+                .add("key", key)
+                .add("oldValue", oldValue)
+                .add("newValue", newValue)
+                .toString();
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
new file mode 100644
index 0000000..c105a08
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/MapUpdate.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Map update operation.
+ *
+ * @param <K> map key type
+ * @param <V> map value type
+ *
+ */
+public final class MapUpdate<K, V> {
+
+    /**
+     * Type of database update operation.
+     */
+    public enum Type {
+        /**
+         * Insert/Update entry without any checks.
+         */
+        PUT,
+        /**
+         * Insert an entry iff there is no existing entry for that key.
+         */
+        PUT_IF_ABSENT,
+
+        /**
+         * Update entry if the current version matches specified version.
+         */
+        PUT_IF_VERSION_MATCH,
+
+        /**
+         * Update entry if the current value matches specified value.
+         */
+        PUT_IF_VALUE_MATCH,
+
+        /**
+         * Remove entry without any checks.
+         */
+        REMOVE,
+
+        /**
+         * Remove entry if the current version matches specified version.
+         */
+        REMOVE_IF_VERSION_MATCH,
+
+        /**
+         * Remove entry if the current value matches specified value.
+         */
+        REMOVE_IF_VALUE_MATCH,
+    }
+
+    private Type type;
+    private K key;
+    private V value;
+    private V currentValue;
+    private long currentVersion = -1;
+
+    /**
+     * Returns the type of update operation.
+     * @return type of update.
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the item key being updated.
+     * @return item key
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * Returns the new value.
+     * @return item's target value.
+     */
+    public V value() {
+        return value;
+    }
+
+    /**
+     * Returns the expected current value for the key.
+     * @return current value in database.
+     */
+    public V currentValue() {
+        return currentValue;
+    }
+
+    /**
+     * Returns the expected current version in the database for the key.
+     * @return expected version.
+     */
+    public long currentVersion() {
+        return currentVersion;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("type", type)
+            .add("key", key)
+            .add("value", value)
+            .add("currentValue", currentValue)
+            .add("currentVersion", currentVersion)
+            .toString();
+    }
+
+    /**
+     * Creates a new builder instance.
+     *
+     * @param <K> key type
+     * @param <V> value type
+     * @return builder.
+     */
+    public static <K, V> Builder<K, V> newBuilder() {
+        return new Builder<>();
+    }
+
+    /**
+     * MapUpdate builder.
+     *
+     * @param <K> key type
+     * @param <V> value type
+     */
+    public static final class Builder<K, V> {
+
+        private MapUpdate<K, V> update = new MapUpdate<>();
+
+        public MapUpdate<K, V> build() {
+            validateInputs();
+            return update;
+        }
+
+        public Builder<K, V> withType(Type type) {
+            update.type = checkNotNull(type, "type cannot be null");
+            return this;
+        }
+
+        public Builder<K, V> withKey(K key) {
+            update.key = checkNotNull(key, "key cannot be null");
+            return this;
+        }
+
+        public Builder<K, V> withCurrentValue(V value) {
+            update.currentValue = checkNotNull(value, "currentValue cannot be null");
+            return this;
+        }
+
+        public Builder<K, V> withValue(V value) {
+            update.value = checkNotNull(value, "value cannot be null");
+            return this;
+        }
+
+        public Builder<K, V> withCurrentVersion(long version) {
+            checkArgument(version >= 0, "version cannot be negative");
+            update.currentVersion = version;
+            return this;
+        }
+
+        private void validateInputs() {
+            checkNotNull(update.type, "type must be specified");
+            checkNotNull(update.key, "key must be specified");
+            switch (update.type) {
+            case PUT:
+            case PUT_IF_ABSENT:
+                checkNotNull(update.value, "value must be specified.");
+                break;
+            case PUT_IF_VERSION_MATCH:
+                checkNotNull(update.value, "value must be specified.");
+                checkState(update.currentVersion >= 0, "current version must be specified");
+                break;
+            case PUT_IF_VALUE_MATCH:
+                checkNotNull(update.value, "value must be specified.");
+                checkNotNull(update.currentValue, "currentValue must be specified.");
+                break;
+            case REMOVE:
+                break;
+            case REMOVE_IF_VERSION_MATCH:
+                checkState(update.currentVersion >= 0, "current version must be specified");
+                break;
+            case REMOVE_IF_VALUE_MATCH:
+                checkNotNull(update.currentValue, "currentValue must be specified.");
+                break;
+            default:
+                throw new IllegalStateException("Unknown operation type");
+            }
+        }
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java
new file mode 100644
index 0000000..0465743
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/PrepareResult.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+/**
+ * Response enum for two phase commit prepare operation.
+ */
+public enum PrepareResult {
+    /**
+     * Signifies a successful execution of the prepare operation.
+     */
+    OK,
+
+    /**
+     * Signifies a failure to another transaction locking the underlying state.
+     */
+    CONCURRENT_TRANSACTION,
+
+    /**
+     * Signifies a optimistic lock failure. This can happen if underlying state has changed since it was last read.
+     */
+    OPTIMISTIC_LOCK_FAILURE,
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/RollbackResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/RollbackResult.java
new file mode 100644
index 0000000..6f0790f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/RollbackResult.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+/**
+ * Response enum for two phase commit rollback operation.
+ */
+public enum RollbackResult {
+    /**
+     * Signifies a successful rollback execution.
+     */
+    OK,
+
+    /**
+     * Signifies a failure due to unrecognized transaction identifier.
+     */
+    UNKNOWN_TRANSACTION_ID,
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionId.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionId.java
new file mode 100644
index 0000000..e378580
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionId.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import com.google.common.base.Objects;
+
+/**
+ * Transaction identifier.
+ */
+public final class TransactionId {
+
+    public static TransactionId from(String id) {
+        return new TransactionId(id);
+    }
+
+    private final String id;
+
+    private TransactionId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String toString() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return id.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other instanceof TransactionId) {
+            TransactionId that = (TransactionId) other;
+            return Objects.equal(this.id, that.id);
+        }
+        return false;
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionalMapUpdate.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionalMapUpdate.java
new file mode 100644
index 0000000..6e83b69
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/TransactionalMapUpdate.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+/**
+ * A batch updates to an {@code AsyncConsistentMap} be committed as a transaction.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class TransactionalMapUpdate<K, V> {
+    private final TransactionId transactionId;
+    private final Collection<MapUpdate<K, V>> updates;
+    private boolean indexPopulated = false;
+    private final Map<K, V> keyValueIndex = Maps.newHashMap();
+
+    public TransactionalMapUpdate(TransactionId transactionId, Collection<MapUpdate<K, V>> updates) {
+        this.transactionId = transactionId;
+        this.updates = ImmutableList.copyOf(updates);
+        populateIndex();
+    }
+
+    /**
+     * Returns the transaction identifier.
+     * @return transaction id
+     */
+    public TransactionId transactionId() {
+        return transactionId;
+    }
+
+    /**
+     * Returns the collection of map updates.
+     * @return map updates
+     */
+    public Collection<MapUpdate<K, V>> batch() {
+        return updates;
+    }
+
+    /**
+     * Returns the value that will be associated with the key after this transaction commits.
+     * @param key key
+     * @return value that will be associated with the value once this transaction commits
+     */
+    public V valueForKey(K key) {
+        if (!indexPopulated) {
+            // We do not synchronize as we don't expect this called to be made from multiple threads.
+            populateIndex();
+        }
+        return keyValueIndex.get(key);
+    }
+
+    /**
+     * Populates the internal key -> value mapping.
+     */
+    private synchronized void populateIndex() {
+        updates.forEach(mapUpdate -> {
+            if (mapUpdate.value() != null) {
+                keyValueIndex.put(mapUpdate.key(), mapUpdate.value());
+            }
+        });
+        indexPopulated = true;
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/package-info.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/package-info.java
new file mode 100644
index 0000000..9cf8c15
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * State machine implementation classes for distributed primitives.
+ */
+package org.onosproject.store.primitives.resources.impl;
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
new file mode 100644
index 0000000..445dda4
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapTest.java
@@ -0,0 +1,447 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.resource.ResourceType;
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests for {@link AtomixConsistentMap}.
+ */
+public class AtomixConsistentMapTest extends AtomixTestBase {
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(AtomixConsistentMap.class);
+    }
+
+    /**
+     * Tests various basic map operations.
+     */
+    @Test
+    public void testBasicMapOperations() throws Throwable {
+        basicMapOperationTests(1);
+        clearTests();
+        basicMapOperationTests(2);
+        clearTests();
+        basicMapOperationTests(3);
+    }
+
+    /**
+     * Tests various map compute* operations on different cluster sizes.
+     */
+    @Test
+    public void testMapComputeOperations() throws Throwable {
+        mapComputeOperationTests(1);
+        clearTests();
+        mapComputeOperationTests(2);
+        clearTests();
+        mapComputeOperationTests(3);
+    }
+
+    /**
+     * Tests map event notifications.
+     */
+    @Test
+    public void testMapListeners() throws Throwable {
+        mapListenerTests(1);
+        clearTests();
+        mapListenerTests(2);
+        clearTests();
+        mapListenerTests(3);
+    }
+
+    /**
+     * Tests map transaction commit.
+     */
+    @Test
+    public void testTransactionCommit() throws Throwable {
+        transactionCommitTests(1);
+        clearTests();
+        transactionCommitTests(2);
+        clearTests();
+        transactionCommitTests(3);
+    }
+
+    /**
+     * Tests map transaction rollback.
+     */
+    @Test
+    public void testTransactionRollback() throws Throwable {
+        transactionRollbackTests(1);
+        clearTests();
+        transactionRollbackTests(2);
+        clearTests();
+        transactionRollbackTests(3);
+    }
+
+    protected void basicMapOperationTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+
+        final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
+        final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+
+        map.isEmpty().thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.put("foo", rawFooValue).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 1);
+        }).join();
+
+        map.isEmpty().thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
+            assertNotNull(result);
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+        }).join();
+
+        map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 2);
+        }).join();
+
+        map.keySet().thenAccept(result -> {
+            assertTrue(result.size() == 2);
+            assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
+        }).join();
+
+        map.values().thenAccept(result -> {
+            assertTrue(result.size() == 2);
+            List<String> rawValues =
+                    result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
+            assertTrue(rawValues.contains("Hello foo!"));
+            assertTrue(rawValues.contains("Hello bar!"));
+        }).join();
+
+        map.entrySet().thenAccept(result -> {
+            assertTrue(result.size() == 2);
+            // TODO: check entries
+        }).join();
+
+        map.get("foo").thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+        }).join();
+
+        map.remove("foo").thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
+        }).join();
+
+        map.containsKey("foo").thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.get("foo").thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.get("bar").thenAccept(result -> {
+            assertNotNull(result);
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
+        }).join();
+
+        map.containsKey("bar").thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 1);
+        }).join();
+
+        map.containsValue(rawBarValue).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.containsValue(rawFooValue).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
+            assertNotNull(result);
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
+        }).join();
+
+        map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        // try replace_if_value_match for a non-existent key
+        map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        Versioned<byte[]> barValue = map.get("bar").join();
+        map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+
+        map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+
+        map.clear().join();
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 0);
+        }).join();
+    }
+
+    public void mapComputeOperationTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        final byte[] value1 = Tools.getBytesUtf8("value1");
+        final byte[] value2 = Tools.getBytesUtf8("value2");
+        final byte[] value3 = Tools.getBytesUtf8("value3");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+
+        map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+        }).join();
+
+        map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+        }).join();
+
+        map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
+            assertNull(result);
+        });
+
+        map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
+        }).join();
+
+        map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+        }).join();
+
+        map.compute("foo", (k, v) -> value2).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
+        }).join();
+    }
+
+
+    protected void mapListenerTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        final byte[] value1 = Tools.getBytesUtf8("value1");
+        final byte[] value2 = Tools.getBytesUtf8("value2");
+        final byte[] value3 = Tools.getBytesUtf8("value3");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+        TestMapEventListener listener = new TestMapEventListener();
+
+        // add listener; insert new value into map and verify an INSERT event is received.
+        map.addListener(listener).join();
+        map.put("foo", value1).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.INSERT, listener.event().type());
+        assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        // remove listener and verify listener is not notified.
+        map.removeListener(listener).join();
+        map.put("foo", value2).join();
+        assertNull(listener.event());
+
+        // add the listener back and verify UPDATE events are received correctly
+        map.addListener(listener).join();
+        map.put("foo", value3).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+        assertTrue(Arrays.equals(value3, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        // perform a non-state changing operation and verify no events are received.
+        map.putIfAbsent("foo", value1).join();
+        assertNull(listener.event());
+
+        // verify REMOVE events are received correctly.
+        map.remove("foo").join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.REMOVE, listener.event().type());
+        assertTrue(Arrays.equals(value3, listener.event().oldValue().value()));
+        listener.clearEvent();
+
+        // verify compute methods also generate events.
+        map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.INSERT, listener.event().type());
+        assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        map.compute("foo", (k, v) -> value2).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+        assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.REMOVE, listener.event().type());
+        assertTrue(Arrays.equals(value2, listener.event().oldValue().value()));
+        listener.clearEvent();
+
+        map.removeListener(listener).join();
+    }
+
+    protected void transactionCommitTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        final byte[] value1 = Tools.getBytesUtf8("value1");
+        final byte[] value2 = Tools.getBytesUtf8("value2");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+        TestMapEventListener listener = new TestMapEventListener();
+
+        map.addListener(listener).join();
+
+        MapUpdate<String, byte[]> update1 =
+                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
+                .withKey("foo")
+                .withValue(value1)
+                .build();
+
+        TransactionalMapUpdate<String, byte[]> txMapUpdate =
+                new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
+
+        map.prepare(txMapUpdate).thenAccept(result -> {
+            assertEquals(PrepareResult.OK, result);
+        }).join();
+        assertNull(listener.event());
+
+        map.size().thenAccept(result -> {
+            assertTrue(result == 0);
+        }).join();
+
+        map.get("foo").thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        try {
+            map.put("foo", value2).join();
+            assertTrue(false);
+        } catch (CompletionException e) {
+            assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
+        }
+
+        assertNull(listener.event());
+
+        map.commit(txMapUpdate.transactionId()).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.INSERT, listener.event().type());
+        assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
+        listener.clearEvent();
+
+        map.put("foo", value2).thenAccept(result -> {
+            assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
+        }).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.UPDATE, listener.event().type());
+        assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+        listener.clearEvent();
+    }
+
+    protected void transactionRollbackTests(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        final byte[] value1 = Tools.getBytesUtf8("value1");
+        final byte[] value2 = Tools.getBytesUtf8("value2");
+
+        AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
+        TestMapEventListener listener = new TestMapEventListener();
+
+        map.addListener(listener).join();
+
+        MapUpdate<String, byte[]> update1 =
+                MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
+                .withKey("foo")
+                .withValue(value1)
+                .build();
+        TransactionalMapUpdate<String, byte[]> txMapUpdate =
+                new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
+        map.prepare(txMapUpdate).thenAccept(result -> {
+            assertEquals(PrepareResult.OK, result);
+        }).join();
+        assertNull(listener.event());
+
+        map.rollback(txMapUpdate.transactionId()).join();
+        assertNull(listener.event());
+
+        map.get("foo").thenAccept(result -> {
+            assertNull(result);
+        }).join();
+
+        map.put("foo", value2).thenAccept(result -> {
+            assertNull(result);
+        }).join();
+        assertNotNull(listener.event());
+        assertEquals(MapEvent.Type.INSERT, listener.event().type());
+        assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
+        listener.clearEvent();
+    }
+
+    private static class TestMapEventListener implements MapEventListener<String, byte[]> {
+
+        MapEvent<String, byte[]> event;
+
+        @Override
+        public void event(MapEvent<String, byte[]> event) {
+            this.event = event;
+        }
+
+        public MapEvent<String, byte[]> event() {
+            return event;
+        }
+
+        public void clearEvent() {
+            event = null;
+        }
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
new file mode 100644
index 0000000..7d77d03
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -0,0 +1,339 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+
+import io.atomix.Atomix;
+import io.atomix.resource.ResourceType;
+
+/**
+ * Unit tests for {@link AtomixLeaderElector}.
+ */
+public class AtomixLeaderElectorTest extends AtomixTestBase {
+
+    NodeId node1 = new NodeId("node1");
+    NodeId node2 = new NodeId("node2");
+    NodeId node3 = new NodeId("node3");
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(AtomixLeaderElector.class);
+    }
+
+    @Test
+    public void testRun() throws Throwable {
+        leaderElectorRunTests(1);
+        clearTests();
+//        leaderElectorRunTests(2);
+//        clearTests();
+//        leaderElectorRunTests(3);
+//        clearTests();
+    }
+
+    private void leaderElectorRunTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(1, result.leader().term());
+            assertEquals(1, result.candidates().size());
+            assertEquals(node1, result.candidates().get(0));
+        }).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        elector2.run("foo", node2).thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(1, result.leader().term());
+            assertEquals(2, result.candidates().size());
+            assertEquals(node1, result.candidates().get(0));
+            assertEquals(node2, result.candidates().get(1));
+        }).join();
+    }
+
+    @Test
+    public void testWithdraw() throws Throwable {
+        leaderElectorWithdrawTests(1);
+        clearTests();
+        leaderElectorWithdrawTests(2);
+        clearTests();
+        leaderElectorWithdrawTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorWithdrawTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        elector2.run("foo", node2).join();
+
+        LeaderEventListener listener1 = new LeaderEventListener();
+        elector1.addChangeListener(listener1).join();
+
+        LeaderEventListener listener2 = new LeaderEventListener();
+        elector2.addChangeListener(listener2).join();
+
+        elector1.withdraw("foo").join();
+
+        listener1.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().leader().term());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node2, result.newValue().candidates().get(0));
+        }).join();
+
+        listener2.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().leader().term());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node2, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
+    public void testAnoint() throws Throwable {
+        leaderElectorAnointTests(1);
+        clearTests();
+        leaderElectorAnointTests(2);
+        clearTests();
+        leaderElectorAnointTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorAnointTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        Atomix client3 = createAtomixClient();
+        AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        elector2.run("foo", node2).join();
+
+        LeaderEventListener listener1 = new LeaderEventListener();
+        elector1.addChangeListener(listener1).join();
+        LeaderEventListener listener2 = new LeaderEventListener();
+        elector2.addChangeListener(listener2);
+        LeaderEventListener listener3 = new LeaderEventListener();
+        elector3.addChangeListener(listener3).join();
+
+        elector3.anoint("foo", node3).thenAccept(result -> {
+            assertFalse(result);
+        }).join();
+        assertFalse(listener1.hasEvent());
+        assertFalse(listener2.hasEvent());
+        assertFalse(listener3.hasEvent());
+
+        elector3.anoint("foo", node2).thenAccept(result -> {
+            assertTrue(result);
+        }).join();
+        assertTrue(listener1.hasEvent());
+        assertTrue(listener2.hasEvent());
+        assertTrue(listener3.hasEvent());
+
+        listener1.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+            assertEquals(node2, result.newValue().candidates().get(1));
+        }).join();
+        listener2.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+            assertEquals(node2, result.newValue().candidates().get(1));
+        }).join();
+        listener3.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(2, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+            assertEquals(node2, result.newValue().candidates().get(1));
+        }).join();
+    }
+
+    @Test
+    public void testLeaderSessionClose() throws Throwable {
+        leaderElectorLeaderSessionCloseTests(1);
+        clearTests();
+        leaderElectorLeaderSessionCloseTests(2);
+        clearTests();
+        leaderElectorLeaderSessionCloseTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        LeaderEventListener listener = new LeaderEventListener();
+        elector2.run("foo", node2).join();
+        elector2.addChangeListener(listener).join();
+        client1.close();
+        listener.nextEvent().thenAccept(result -> {
+            assertEquals(node2, result.newValue().leaderNodeId());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node2, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
+    public void testNonLeaderSessionClose() throws Throwable {
+        leaderElectorNonLeaderSessionCloseTests(1);
+        clearTests();
+        leaderElectorNonLeaderSessionCloseTests(2);
+        clearTests();
+        leaderElectorNonLeaderSessionCloseTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        LeaderEventListener listener = new LeaderEventListener();
+        elector2.run("foo", node2).join();
+        elector1.addChangeListener(listener).join();
+        client2.close().join();
+        listener.nextEvent().thenAccept(result -> {
+            assertEquals(node1, result.newValue().leaderNodeId());
+            assertEquals(1, result.newValue().candidates().size());
+            assertEquals(node1, result.newValue().candidates().get(0));
+        }).join();
+    }
+
+    @Test
+    public void testQueries() throws Throwable {
+        leaderElectorQueryTests(1);
+        clearTests();
+        leaderElectorQueryTests(2);
+        clearTests();
+        leaderElectorQueryTests(3);
+        clearTests();
+    }
+
+    private void leaderElectorQueryTests(int numServers) throws Throwable {
+        createCopycatServers(numServers);
+        Atomix client1 = createAtomixClient();
+        Atomix client2 = createAtomixClient();
+        AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+        AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+        elector1.run("foo", node1).join();
+        elector2.run("foo", node2).join();
+        elector2.run("bar", node2).join();
+        elector1.getElectedTopics(node1).thenAccept(result -> {
+            assertEquals(1, result.size());
+            assertTrue(result.contains("foo"));
+        }).join();
+        elector2.getElectedTopics(node1).thenAccept(result -> {
+            assertEquals(1, result.size());
+            assertTrue(result.contains("foo"));
+        }).join();
+        elector1.getLeadership("foo").thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(node1, result.candidates().get(0));
+            assertEquals(node2, result.candidates().get(1));
+        }).join();
+        elector2.getLeadership("foo").thenAccept(result -> {
+            assertEquals(node1, result.leaderNodeId());
+            assertEquals(node1, result.candidates().get(0));
+            assertEquals(node2, result.candidates().get(1));
+        }).join();
+        elector1.getLeadership("bar").thenAccept(result -> {
+            assertEquals(node2, result.leaderNodeId());
+            assertEquals(node2, result.candidates().get(0));
+        }).join();
+        elector2.getLeadership("bar").thenAccept(result -> {
+            assertEquals(node2, result.leaderNodeId());
+            assertEquals(node2, result.candidates().get(0));
+        }).join();
+        elector1.getLeaderships().thenAccept(result -> {
+            assertEquals(2, result.size());
+            Leadership fooLeadership = result.get("foo");
+            assertEquals(node1, fooLeadership.leaderNodeId());
+            assertEquals(node1, fooLeadership.candidates().get(0));
+            assertEquals(node2, fooLeadership.candidates().get(1));
+            Leadership barLeadership = result.get("bar");
+            assertEquals(node2, barLeadership.leaderNodeId());
+            assertEquals(node2, barLeadership.candidates().get(0));
+        }).join();
+        elector2.getLeaderships().thenAccept(result -> {
+            assertEquals(2, result.size());
+            Leadership fooLeadership = result.get("foo");
+            assertEquals(node1, fooLeadership.leaderNodeId());
+            assertEquals(node1, fooLeadership.candidates().get(0));
+            assertEquals(node2, fooLeadership.candidates().get(1));
+            Leadership barLeadership = result.get("bar");
+            assertEquals(node2, barLeadership.leaderNodeId());
+            assertEquals(node2, barLeadership.candidates().get(0));
+        }).join();
+    }
+
+    private static class LeaderEventListener implements Consumer<Change<Leadership>> {
+        Queue<Change<Leadership>> eventQueue = new LinkedList<>();
+        CompletableFuture<Change<Leadership>> pendingFuture;
+
+        @Override
+        public void accept(Change<Leadership> change) {
+            synchronized (this) {
+                if (pendingFuture != null) {
+                    pendingFuture.complete(change);
+                    pendingFuture = null;
+                } else {
+                    eventQueue.add(change);
+                }
+            }
+        }
+
+        public boolean hasEvent() {
+            return !eventQueue.isEmpty();
+        }
+
+        public CompletableFuture<Change<Leadership>> nextEvent() {
+            synchronized (this) {
+                if (eventQueue.isEmpty()) {
+                    if (pendingFuture == null) {
+                        pendingFuture = new CompletableFuture<>();
+                    }
+                    return pendingFuture;
+                } else {
+                    return CompletableFuture.completedFuture(eventQueue.poll());
+                }
+            }
+        }
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
new file mode 100644
index 0000000..d38400d
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLongTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import io.atomix.Atomix;
+import io.atomix.resource.ResourceType;
+import io.atomix.variables.DistributedLong;
+
+/**
+ * Unit tests for {@link AtomixCounter}.
+ */
+public class AtomixLongTest extends AtomixTestBase {
+
+    @Override
+    protected ResourceType resourceType() {
+        return new ResourceType(DistributedLong.class);
+    }
+
+    @Test
+    public void testBasicOperations() throws Throwable {
+        basicOperationsTest(1);
+        clearTests();
+        basicOperationsTest(2);
+        clearTests();
+        basicOperationsTest(3);
+        clearTests();
+    }
+
+    protected void basicOperationsTest(int clusterSize) throws Throwable {
+        createCopycatServers(clusterSize);
+        Atomix atomix = createAtomixClient();
+        AtomixCounter along = new AtomixCounter("test-long", atomix.getLong("test-long").join());
+        assertEquals(0, along.get().join().longValue());
+        assertEquals(1, along.incrementAndGet().join().longValue());
+        along.set(100).join();
+        assertEquals(100, along.get().join().longValue());
+        assertEquals(100, along.getAndAdd(10).join().longValue());
+        assertEquals(110, along.get().join().longValue());
+        assertFalse(along.compareAndSet(109, 111).join());
+        assertTrue(along.compareAndSet(110, 111).join());
+        assertEquals(100, along.addAndGet(-11).join().longValue());
+        assertEquals(100, along.getAndIncrement().join().longValue());
+        assertEquals(101, along.get().join().longValue());
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
new file mode 100644
index 0000000..d655d52
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.Atomix;
+import io.atomix.AtomixClient;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.LocalServerRegistry;
+import io.atomix.catalyst.transport.LocalTransport;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.server.CopycatServer;
+import io.atomix.copycat.server.storage.Storage;
+import io.atomix.copycat.server.storage.StorageLevel;
+import io.atomix.manager.state.ResourceManagerState;
+import io.atomix.resource.ResourceRegistry;
+import io.atomix.resource.ResourceType;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.onosproject.store.primitives.impl.CatalystSerializers;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Base class for various Atomix* tests.
+ */
+public abstract class AtomixTestBase {
+    private static final File TEST_DIR = new File("target/test-logs");
+    protected LocalServerRegistry registry;
+    protected int port;
+    protected List<Address> members;
+    protected List<CopycatClient> copycatClients = new ArrayList<>();
+    protected List<CopycatServer> copycatServers = new ArrayList<>();
+    protected List<Atomix> atomixClients = new ArrayList<>();
+    protected List<CopycatServer> atomixServers = new ArrayList<>();
+    protected Serializer serializer = CatalystSerializers.getSerializer();
+
+    /**
+     * Creates a new resource state machine.
+     *
+     * @return A new resource state machine.
+     */
+    protected abstract ResourceType resourceType();
+
+    /**
+     * Returns the next server address.
+     *
+     * @return The next server address.
+     */
+    private Address nextAddress() {
+        Address address = new Address("localhost", port++);
+        members.add(address);
+        return address;
+    }
+
+    /**
+     * Creates a set of Copycat servers.
+     */
+    protected List<CopycatServer> createCopycatServers(int nodes) throws Throwable {
+        CountDownLatch latch = new CountDownLatch(nodes);
+        List<CopycatServer> servers = new ArrayList<>();
+
+        List<Address> members = new ArrayList<>();
+        for (int i = 0; i < nodes; i++) {
+            members.add(nextAddress());
+        }
+
+        for (int i = 0; i < nodes; i++) {
+            CopycatServer server = createCopycatServer(members.get(i));
+            server.open().thenRun(latch::countDown);
+            servers.add(server);
+        }
+
+        Uninterruptibles.awaitUninterruptibly(latch);
+
+        return servers;
+    }
+
+    /**
+     * Creates a Copycat server.
+     */
+    protected CopycatServer createCopycatServer(Address address) {
+        ResourceRegistry resourceRegistry = new ResourceRegistry();
+        resourceRegistry.register(resourceType());
+        CopycatServer server = CopycatServer.builder(address, members)
+                .withTransport(new LocalTransport(registry))
+                .withStorage(Storage.builder()
+                        .withStorageLevel(StorageLevel.DISK)
+                        .withDirectory(TEST_DIR + "/" + address.port())
+                        .withSerializer(serializer.clone())
+                        .build())
+                .withStateMachine(() -> new ResourceManagerState(resourceRegistry))
+                .withSerializer(serializer.clone())
+                .withHeartbeatInterval(Duration.ofMillis(25))
+                .withElectionTimeout(Duration.ofMillis(50))
+                .withSessionTimeout(Duration.ofMillis(100))
+                .build();
+        copycatServers.add(server);
+        return server;
+    }
+
+    @Before
+    @After
+    public void clearTests() throws Exception {
+        registry = new LocalServerRegistry();
+        members = new ArrayList<>();
+        port = 5000;
+
+        CompletableFuture<Void> closeClients =
+                CompletableFuture.allOf(atomixClients.stream()
+                                                     .map(Atomix::close)
+                                                     .toArray(CompletableFuture[]::new));
+
+        closeClients.thenCompose(v -> CompletableFuture.allOf(copycatServers.stream()
+                .map(CopycatServer::close)
+                .toArray(CompletableFuture[]::new))).join();
+
+        deleteDirectory(TEST_DIR);
+
+        atomixClients = new ArrayList<>();
+
+        copycatServers = new ArrayList<>();
+    }
+
+    /**
+     * Deletes a directory recursively.
+     */
+    private void deleteDirectory(File directory) throws IOException {
+        if (directory.exists()) {
+            File[] files = directory.listFiles();
+            if (files != null) {
+                for (File file : files) {
+                    if (file.isDirectory()) {
+                        deleteDirectory(file);
+                    } else {
+                        Files.delete(file.toPath());
+                    }
+                }
+            }
+            Files.delete(directory.toPath());
+        }
+    }
+
+    /**
+     * Creates a Atomix client.
+     */
+    protected Atomix createAtomixClient() {
+        CountDownLatch latch = new CountDownLatch(1);
+        Atomix client = AtomixClient.builder(members)
+                .withTransport(new LocalTransport(registry))
+                .withSerializer(serializer.clone())
+                .withResourceResolver(r -> r.register(resourceType()))
+                .build();
+        client.open().thenRun(latch::countDown);
+        atomixClients.add(client);
+        Uninterruptibles.awaitUninterruptibly(latch);
+        return client;
+    }
+}