[ONOS-6594] Upgrade to Atomix 2.0.0

Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
deleted file mode 100644
index 2c4ee73..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.serializer.TypeSerializerFactory;
-import io.atomix.manager.util.ResourceManagerTypeResolver;
-import io.atomix.variables.internal.LongCommands;
-import org.onlab.util.Match;
-import org.onosproject.cluster.Leader;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.Change;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
-import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueFactory;
-import org.onosproject.store.primitives.resources.impl.CommitResult;
-import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult;
-import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
-import org.onosproject.store.primitives.resources.impl.PrepareResult;
-import org.onosproject.store.primitives.resources.impl.RollbackResult;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.DocumentPath;
-import org.onosproject.store.service.DocumentTreeEvent;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.MultimapEvent;
-import org.onosproject.store.service.Task;
-import org.onosproject.store.service.Versioned;
-import org.onosproject.store.service.WorkQueueStats;
-
-import java.util.Arrays;
-import java.util.Optional;
-
-/**
- * Serializer utility for Atomix Catalyst.
- */
-public final class CatalystSerializers {
-
-    private CatalystSerializers() {
-    }
-
-    public static Serializer getSerializer() {
-        Serializer serializer = new Serializer();
-        TypeSerializerFactory factory =
-                new DefaultCatalystTypeSerializerFactory(
-                        org.onosproject.store.service.Serializer.using(Arrays.asList((KryoNamespaces.API)),
-                                                                       MapEntryUpdateResult.class,
-                                                                       MapEntryUpdateResult.Status.class,
-                                                                       Transaction.State.class,
-                                                                       PrepareResult.class,
-                                                                       CommitResult.class,
-                                                                       DocumentPath.class,
-                                                                       DocumentTreeUpdateResult.class,
-                                                                       DocumentTreeUpdateResult.Status.class,
-                                                                       DocumentTreeEvent.class,
-                                                                       DocumentTreeEvent.Type.class,
-                                                                       RollbackResult.class));
-        // ONOS classes
-        serializer.register(Change.class, factory);
-        serializer.register(Leader.class, factory);
-        serializer.register(Leadership.class, factory);
-        serializer.register(NodeId.class, factory);
-        serializer.register(Match.class, factory);
-        serializer.register(MapEntryUpdateResult.class, factory);
-        serializer.register(MapEntryUpdateResult.Status.class, factory);
-        serializer.register(Transaction.State.class, factory);
-        serializer.register(PrepareResult.class, factory);
-        serializer.register(CommitResult.class, factory);
-        serializer.register(RollbackResult.class, factory);
-        serializer.register(TransactionId.class, factory);
-        serializer.register(MapUpdate.class, factory);
-        serializer.register(MapUpdate.Type.class, factory);
-        serializer.register(TransactionLog.class, factory);
-        serializer.register(Versioned.class, factory);
-        serializer.register(MapEvent.class, factory);
-        serializer.register(MultimapEvent.class, factory);
-        serializer.register(MultimapEvent.Type.class, factory);
-        serializer.register(Task.class, factory);
-        serializer.register(WorkQueueStats.class, factory);
-        serializer.register(DocumentPath.class, factory);
-        serializer.register(DocumentTreeUpdateResult.class, factory);
-        serializer.register(DocumentTreeUpdateResult.Status.class, factory);
-        serializer.register(DocumentTreeEvent.class, factory);
-        serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
-        serializer.register(ImmutableList.of().getClass(), factory);
-        serializer.register(ImmutableList.of("a").getClass(), factory);
-        serializer.register(Arrays.asList().getClass(), factory);
-        serializer.register(HashMultiset.class, factory);
-        serializer.register(Optional.class, factory);
-
-        serializer.resolve(new LongCommands.TypeResolver());
-        serializer.resolve(new AtomixConsistentMapCommands.TypeResolver());
-        serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
-        serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
-        serializer.resolve(new AtomixDocumentTreeCommands.TypeResolver());
-        serializer.resolve(new ResourceManagerTypeResolver());
-        serializer.resolve(new AtomixConsistentTreeMapCommands.TypeResolver());
-        serializer.resolve(new AtomixConsistentMultimapCommands.TypeResolver());
-
-        serializer.registerClassLoader(AtomixConsistentMapFactory.class)
-                .registerClassLoader(AtomixLeaderElectorFactory.class)
-                .registerClassLoader(AtomixWorkQueueFactory.class)
-                .registerClassLoader(AtomixDocumentTreeFactory.class)
-                .registerClassLoader(AtomixConsistentTreeMapFactory.class)
-                .registerClassLoader(AtomixConsistentSetMultimapFactory.class);
-
-        return serializer;
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
deleted file mode 100644
index fc94dd6..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Client;
-import io.atomix.catalyst.transport.Server;
-import io.atomix.catalyst.transport.Transport;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingService;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Map;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Copycat transport implementation built on {@link MessagingService}.
- */
-public class CopycatTransport implements Transport {
-    private final PartitionId partitionId;
-    private final MessagingService messagingService;
-    private static final Map<Address, Endpoint> EP_LOOKUP_CACHE = Maps.newConcurrentMap();
-
-    static final byte MESSAGE = 0x01;
-    static final byte CONNECT = 0x02;
-    static final byte CLOSE = 0x03;
-
-    static final byte SUCCESS = 0x01;
-    static final byte FAILURE = 0x02;
-
-    public CopycatTransport(PartitionId partitionId, MessagingService messagingService) {
-        this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
-        this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
-    }
-
-    @Override
-    public Client client() {
-        return new CopycatTransportClient(partitionId, messagingService);
-    }
-
-    @Override
-    public Server server() {
-        return new CopycatTransportServer(partitionId, messagingService);
-    }
-
-    @Override
-    public String toString() {
-        return toStringHelper(this).toString();
-    }
-
-    /**
-     * Maps {@link Address address} to {@link Endpoint endpoint}.
-     * @param address address
-     * @return end point
-     */
-    static Endpoint toEndpoint(Address address) {
-        return EP_LOOKUP_CACHE.computeIfAbsent(address, a -> {
-            try {
-                return new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
-            } catch (UnknownHostException e) {
-                Throwables.propagate(e);
-                return null;
-            }
-        });
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
deleted file mode 100644
index afa98cc..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Client;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.TransportException;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.nio.ByteBuffer;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.primitives.impl.CopycatTransport.CONNECT;
-import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
-
-/**
- * Copycat transport client implementation.
- */
-public class CopycatTransportClient implements Client {
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final PartitionId partitionId;
-    private final String serverSubject;
-    private final MessagingService messagingService;
-    private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
-
-    public CopycatTransportClient(PartitionId partitionId, MessagingService messagingService) {
-        this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
-        this.serverSubject = String.format("onos-copycat-%s", partitionId);
-        this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
-    }
-
-    @Override
-    public CompletableFuture<Connection> connect(Address address) {
-        CompletableFuture<Connection> future = new CompletableFuture<>();
-        ThreadContext context = ThreadContext.currentContextOrThrow();
-        Endpoint endpoint = CopycatTransport.toEndpoint(address);
-
-        log.debug("Connecting to {}", address);
-
-        ByteBuffer requestBuffer = ByteBuffer.allocate(1);
-        requestBuffer.put(CONNECT);
-
-        // Send a connect request to the server to get a unique connection ID.
-        messagingService.sendAndReceive(endpoint, serverSubject, requestBuffer.array(), context.executor())
-                .whenComplete((payload, error) -> {
-                    Throwable wrappedError = error;
-                    if (error != null) {
-                        Throwable rootCause = Throwables.getRootCause(error);
-                        if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
-                            wrappedError = new TransportException(error);
-                        }
-                        // TODO ONOS-6788 we might consider demoting this warning during startup when there is
-                        //      a race between the server registering handlers and the client sending messages
-                        log.warn("Connection to {} failed! Reason: {}", address, wrappedError);
-                        future.completeExceptionally(wrappedError);
-                    } else {
-                        // If the connection is successful, the server will send back a
-                        // connection ID indicating where to send messages for the connection.
-                        ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
-                        if (responseBuffer.get() == SUCCESS) {
-                            long connectionId = responseBuffer.getLong();
-                            CopycatTransportConnection connection = new CopycatTransportConnection(
-                                    connectionId,
-                                    CopycatTransportConnection.Mode.CLIENT,
-                                    partitionId,
-                                    endpoint,
-                                    messagingService,
-                                    context);
-                            connection.onClose(connections::remove);
-                            connections.add(connection);
-                            future.complete(connection);
-                            log.debug("Created connection {}-{} to {}", partitionId, connectionId, address);
-                        } else {
-                            log.warn("Connection to {} failed!");
-                            future.completeExceptionally(new ConnectException());
-                        }
-                    }
-                });
-        return future;
-    }
-
-    @Override
-    public CompletableFuture<Void> close() {
-        return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
-    }
-
-    @Override
-    public String toString() {
-        return toStringHelper(this)
-                .add("partitionId", partitionId)
-                .toString();
-    }
-}
-
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
deleted file mode 100644
index a3a8539..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import com.google.common.base.Throwables;
-import io.atomix.catalyst.concurrent.Listener;
-import io.atomix.catalyst.concurrent.Listeners;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.serializer.SerializationException;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.TransportException;
-import io.atomix.catalyst.util.reference.ReferenceCounted;
-import org.apache.commons.io.IOUtils;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.MessagingException;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.primitives.impl.CopycatTransport.CLOSE;
-import static org.onosproject.store.primitives.impl.CopycatTransport.FAILURE;
-import static org.onosproject.store.primitives.impl.CopycatTransport.MESSAGE;
-import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
-
-/**
- * Base Copycat Transport connection.
- */
-public class CopycatTransportConnection implements Connection {
-    private static final int MAX_MESSAGE_SIZE = 1024 * 1024;
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final long connectionId;
-    private final String localSubject;
-    private final String remoteSubject;
-    private final PartitionId partitionId;
-    private final Endpoint endpoint;
-    private final MessagingService messagingService;
-    private final ThreadContext context;
-    private final Map<Class, InternalHandler> handlers = new ConcurrentHashMap<>();
-    private final Listeners<Throwable> exceptionListeners = new Listeners<>();
-    private final Listeners<Connection> closeListeners = new Listeners<>();
-
-    CopycatTransportConnection(
-            long connectionId,
-            Mode mode,
-            PartitionId partitionId,
-            Endpoint endpoint,
-            MessagingService messagingService,
-            ThreadContext context) {
-        this.connectionId = connectionId;
-        this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
-        this.localSubject = mode.getLocalSubject(partitionId, connectionId);
-        this.remoteSubject = mode.getRemoteSubject(partitionId, connectionId);
-        this.endpoint = checkNotNull(endpoint, "endpoint cannot be null");
-        this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
-        this.context = checkNotNull(context, "context cannot be null");
-        messagingService.registerHandler(localSubject, this::handle);
-    }
-
-    @Override
-    public CompletableFuture<Void> send(Object message) {
-        ThreadContext context = ThreadContext.currentContextOrThrow();
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            DataOutputStream dos = new DataOutputStream(baos);
-            dos.writeByte(MESSAGE);
-            context.serializer().writeObject(message, baos);
-            if (message instanceof ReferenceCounted) {
-                ((ReferenceCounted<?>) message).release();
-            }
-
-            byte[] bytes = baos.toByteArray();
-            if (bytes.length > MAX_MESSAGE_SIZE) {
-                throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
-            }
-            messagingService.sendAsync(endpoint, remoteSubject, bytes)
-                    .whenComplete((r, e) -> {
-                        if (e != null) {
-                            context.executor().execute(() -> future.completeExceptionally(e));
-                        } else {
-                            context.executor().execute(() -> future.complete(null));
-                        }
-                    });
-        } catch (SerializationException | IOException e) {
-            future.completeExceptionally(e);
-        }
-        return future;
-    }
-
-    @Override
-    public <T, U> CompletableFuture<U> sendAndReceive(T message) {
-        ThreadContext context = ThreadContext.currentContextOrThrow();
-        CompletableFuture<U> future = new CompletableFuture<>();
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            DataOutputStream dos = new DataOutputStream(baos);
-            dos.writeByte(MESSAGE);
-            context.serializer().writeObject(message, baos);
-            if (message instanceof ReferenceCounted) {
-                ((ReferenceCounted<?>) message).release();
-            }
-
-            byte[] bytes = baos.toByteArray();
-            if (bytes.length > MAX_MESSAGE_SIZE) {
-                throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
-            }
-            messagingService.sendAndReceive(endpoint,
-                                            remoteSubject,
-                                            bytes,
-                                            context.executor())
-                    .whenComplete((response, error) -> handleResponse(response, error, future));
-        } catch (SerializationException | IOException e) {
-            future.completeExceptionally(e);
-        }
-        return future;
-    }
-
-    /**
-     * Handles a response received from the other side of the connection.
-     */
-    private <T> void handleResponse(
-            byte[] response,
-            Throwable error,
-            CompletableFuture<T> future) {
-        if (error != null) {
-            Throwable rootCause = Throwables.getRootCause(error);
-            if (rootCause instanceof MessagingException.NoRemoteHandler) {
-                future.completeExceptionally(new TransportException(error));
-                close(rootCause);
-            } else if (rootCause instanceof SocketException) {
-                future.completeExceptionally(new TransportException(error));
-            } else {
-                future.completeExceptionally(error);
-            }
-            return;
-        }
-
-        checkNotNull(response);
-        InputStream input = new ByteArrayInputStream(response);
-        try {
-            byte status = (byte) input.read();
-            if (status == FAILURE) {
-                Throwable t = context.serializer().readObject(input);
-                future.completeExceptionally(t);
-            } else {
-                try {
-                    future.complete(context.serializer().readObject(input));
-                } catch (SerializationException e) {
-                    future.completeExceptionally(e);
-                }
-            }
-        } catch (IOException e) {
-            future.completeExceptionally(e);
-        }
-    }
-
-    /**
-     * Handles a message sent to the connection.
-     */
-    private CompletableFuture<byte[]> handle(Endpoint sender, byte[] payload) {
-        try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
-            byte type = input.readByte();
-            switch (type) {
-                case MESSAGE:
-                    return handleMessage(IOUtils.toByteArray(input));
-                case CLOSE:
-                    return handleClose();
-                default:
-                    throw new IllegalStateException("Invalid message type");
-            }
-        } catch (IOException e) {
-            Throwables.propagate(e);
-            return null;
-        }
-    }
-
-    /**
-     * Handles a message from the other side of the connection.
-     */
-    @SuppressWarnings("unchecked")
-    private CompletableFuture<byte[]> handleMessage(byte[] message) {
-        try {
-            Object request = context.serializer().readObject(new ByteArrayInputStream(message));
-            InternalHandler handler = handlers.get(request.getClass());
-            if (handler == null) {
-                log.warn("No handler registered on connection {}-{} for type {}",
-                         partitionId, connectionId, request.getClass());
-                return Tools.exceptionalFuture(new IllegalStateException(
-                        "No handler registered for " + request.getClass()));
-            }
-
-            return handler.handle(request).handle((result, error) -> {
-                try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-                    baos.write(error != null ? FAILURE : SUCCESS);
-                    context.serializer().writeObject(error != null ? error : result, baos);
-                    byte[] bytes = baos.toByteArray();
-                    if (bytes.length > MAX_MESSAGE_SIZE) {
-                        throw new IllegalArgumentException("response exceeds maximum message size " + MAX_MESSAGE_SIZE);
-                    }
-                    return bytes;
-                } catch (IOException e) {
-                    Throwables.propagate(e);
-                    return null;
-                }
-            });
-        } catch (Exception e) {
-            return Tools.exceptionalFuture(e);
-        }
-    }
-
-    /**
-     * Handles a close request from the other side of the connection.
-     */
-    private CompletableFuture<byte[]> handleClose() {
-        CompletableFuture<byte[]> future = new CompletableFuture<>();
-        context.executor().execute(() -> {
-            close(null);
-            ByteBuffer responseBuffer = ByteBuffer.allocate(1);
-            responseBuffer.put(SUCCESS);
-            future.complete(responseBuffer.array());
-        });
-        return future;
-    }
-
-    @Override
-    public <T, U> Connection handler(Class<T> type, Consumer<T> handler) {
-        return handler(type, r -> {
-            handler.accept(r);
-            return null;
-        });
-    }
-
-    @Override
-    public <T, U> Connection handler(Class<T> type, Function<T, CompletableFuture<U>> handler) {
-        if (log.isTraceEnabled()) {
-            log.trace("Registered handler on connection {}-{}: {}", partitionId, connectionId, type);
-        }
-        handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
-        return this;
-    }
-
-    @Override
-    public Listener<Throwable> onException(Consumer<Throwable> consumer) {
-        return exceptionListeners.add(consumer);
-    }
-
-    @Override
-    public Listener<Connection> onClose(Consumer<Connection> consumer) {
-        return closeListeners.add(consumer);
-    }
-
-    @Override
-    public CompletableFuture<Void> close() {
-        log.debug("Closing connection {}-{}", partitionId, connectionId);
-
-        ByteBuffer requestBuffer = ByteBuffer.allocate(1);
-        requestBuffer.put(CLOSE);
-
-        ThreadContext context = ThreadContext.currentContextOrThrow();
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
-                .whenComplete((payload, error) -> {
-                    close(error);
-                    Throwable wrappedError = error;
-                    if (error != null) {
-                        Throwable rootCause = Throwables.getRootCause(error);
-                        if (rootCause instanceof MessagingException.NoRemoteHandler) {
-                            wrappedError = new TransportException(error);
-                        }
-                        future.completeExceptionally(wrappedError);
-                    } else {
-                        ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
-                        if (responseBuffer.get() == SUCCESS) {
-                            future.complete(null);
-                        } else {
-                            future.completeExceptionally(new TransportException("Failed to close connection"));
-                        }
-                    }
-                });
-        return future;
-    }
-
-    /**
-     * Cleans up the connection, unregistering handlers registered on the MessagingService.
-     */
-    private void close(Throwable error) {
-        log.debug("Connection {}-{} closed", partitionId, connectionId);
-        messagingService.unregisterHandler(localSubject);
-        if (error != null) {
-            exceptionListeners.accept(error);
-        }
-        closeListeners.accept(this);
-    }
-
-    /**
-     * Connection mode used to indicate whether this side of the connection is
-     * a client or server.
-     */
-    enum Mode {
-
-        /**
-         * Represents the client side of a bi-directional connection.
-         */
-        CLIENT {
-            @Override
-            String getLocalSubject(PartitionId partitionId, long connectionId) {
-                return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
-            }
-
-            @Override
-            String getRemoteSubject(PartitionId partitionId, long connectionId) {
-                return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
-            }
-        },
-
-        /**
-         * Represents the server side of a bi-directional connection.
-         */
-        SERVER {
-            @Override
-            String getLocalSubject(PartitionId partitionId, long connectionId) {
-                return String.format("onos-copycat-%s-%d-server", partitionId, connectionId);
-            }
-
-            @Override
-            String getRemoteSubject(PartitionId partitionId, long connectionId) {
-                return String.format("onos-copycat-%s-%d-client", partitionId, connectionId);
-            }
-        };
-
-        /**
-         * Returns the local messaging service subject for the connection in this mode.
-         * Subjects generated by the connection mode are guaranteed to be globally unique.
-         *
-         * @param partitionId the partition ID to which the connection belongs.
-         * @param connectionId the connection ID.
-         * @return the globally unique local subject for the connection.
-         */
-        abstract String getLocalSubject(PartitionId partitionId, long connectionId);
-
-        /**
-         * Returns the remote messaging service subject for the connection in this mode.
-         * Subjects generated by the connection mode are guaranteed to be globally unique.
-         *
-         * @param partitionId the partition ID to which the connection belongs.
-         * @param connectionId the connection ID.
-         * @return the globally unique remote subject for the connection.
-         */
-        abstract String getRemoteSubject(PartitionId partitionId, long connectionId);
-    }
-
-    /**
-     * Internal container for a handler/context pair.
-     */
-    private static class InternalHandler {
-        private final Function handler;
-        private final ThreadContext context;
-
-        InternalHandler(Function handler, ThreadContext context) {
-            this.handler = handler;
-            this.context = context;
-        }
-
-        @SuppressWarnings("unchecked")
-        CompletableFuture<Object> handle(Object message) {
-            CompletableFuture<Object> future = new CompletableFuture<>();
-            context.executor().execute(() -> {
-                CompletableFuture<Object> responseFuture = (CompletableFuture<Object>) handler.apply(message);
-                if (responseFuture != null) {
-                    responseFuture.whenComplete((r, e) -> {
-                        if (e != null) {
-                            future.completeExceptionally((Throwable) e);
-                        } else {
-                            future.complete(r);
-                        }
-                    });
-                }
-            });
-            return future;
-        }
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
deleted file mode 100644
index 8de05a3..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.collect.Sets;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.Server;
-import org.apache.commons.lang3.RandomUtils;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.primitives.impl.CopycatTransport.CONNECT;
-import static org.onosproject.store.primitives.impl.CopycatTransport.FAILURE;
-import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
-
-/**
- * Copycat transport server implementation.
- */
-public class CopycatTransportServer implements Server {
-    private final Logger log = LoggerFactory.getLogger(getClass());
-    private final PartitionId partitionId;
-    private final String serverSubject;
-    private final MessagingService messagingService;
-    private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
-
-    public CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
-        this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
-        this.serverSubject = String.format("onos-copycat-%s", partitionId);
-        this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
-    }
-
-    @Override
-    public CompletableFuture<Void> listen(Address address, Consumer<Connection> consumer) {
-        ThreadContext context = ThreadContext.currentContextOrThrow();
-        messagingService.registerHandler(serverSubject, (sender, payload) -> {
-
-            // Only connect messages can be sent to the server. Once a connect message
-            // is received, the connection will register a separate handler for messaging.
-            ByteBuffer requestBuffer = ByteBuffer.wrap(payload);
-            if (requestBuffer.get() != CONNECT) {
-                ByteBuffer responseBuffer = ByteBuffer.allocate(1);
-                responseBuffer.put(FAILURE);
-                return CompletableFuture.completedFuture(responseBuffer.array());
-            }
-
-            // Create the connection and ensure state is cleaned up when the connection is closed.
-            long connectionId = RandomUtils.nextLong();
-            CopycatTransportConnection connection = new CopycatTransportConnection(
-                    connectionId,
-                    CopycatTransportConnection.Mode.SERVER,
-                    partitionId,
-                    sender,
-                    messagingService,
-                    context);
-            connection.onClose(connections::remove);
-            connections.add(connection);
-
-            CompletableFuture<byte[]> future = new CompletableFuture<>();
-
-            // We need to ensure the connection event is called on the Copycat thread
-            // and that the future is not completed until the Copycat server has been
-            // able to register message handlers, otherwise some messages can be received
-            // prior to any handlers being registered.
-            context.executor().execute(() -> {
-                log.debug("Created connection {}-{}", partitionId, connectionId);
-                consumer.accept(connection);
-
-                ByteBuffer responseBuffer = ByteBuffer.allocate(9);
-                responseBuffer.put(SUCCESS);
-                responseBuffer.putLong(connectionId);
-                future.complete(responseBuffer.array());
-            });
-            return future;
-        });
-        return CompletableFuture.completedFuture(null);
-    }
-
-    @Override
-    public CompletableFuture<Void> close() {
-        return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
-    }
-
-    @Override
-    public String toString() {
-        return toStringHelper(this)
-                .add("partitionId", partitionId)
-                .toString();
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
index a189eb2..45cf193 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
@@ -32,6 +32,6 @@
 
     @Override
     public AsyncAtomicCounter build() {
-        return primitiveCreator.newAsyncCounter(name(), executorSupplier());
+        return primitiveCreator.newAsyncCounter(name());
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
index c92ef22..309220a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
@@ -33,7 +33,7 @@
 
     @Override
     public AsyncAtomicCounterMap<K> buildAsyncMap() {
-        return primitiveCreator.newAsyncAtomicCounterMap(name(), serializer(), executorSupplier());
+        return primitiveCreator.newAsyncAtomicCounterMap(name(), serializer());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
index ac294f4..c6742cb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicIdGeneratorBuilder.java
@@ -32,6 +32,6 @@
 
     @Override
     public AsyncAtomicIdGenerator build() {
-        return primitiveCreator.newAsyncIdGenerator(name(), executorSupplier());
+        return primitiveCreator.newAsyncIdGenerator(name());
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
index a63fe4be..b17983c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import java.util.concurrent.Executor;
 import java.util.function.Supplier;
 
 import org.onosproject.store.service.AsyncAtomicValue;
@@ -38,12 +37,6 @@
     }
 
     @Override
-    public AtomicValueBuilder<V> withExecutorSupplier(Supplier<Executor> executorSupplier) {
-        mapBuilder.withExecutorSupplier(executorSupplier);
-        return this;
-    }
-
-    @Override
     public AsyncAtomicValue<V> build() {
         return new DefaultAsyncAtomicValue<>(checkNotNull(name()),
                                              checkNotNull(serializer()),
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
deleted file mode 100644
index b0d6841..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import org.onosproject.store.service.Serializer;
-import org.slf4j.Logger;
-
-import com.google.common.base.Throwables;
-
-import io.atomix.catalyst.buffer.BufferInput;
-import io.atomix.catalyst.buffer.BufferOutput;
-import io.atomix.catalyst.serializer.TypeSerializer;
-import io.atomix.catalyst.serializer.TypeSerializerFactory;
-
-/**
- * {@link TypeSerializerFactory} for providing {@link TypeSerializer}s based on
- * {@code org.onosproject.store.service.Serializer}.
- */
-public class DefaultCatalystTypeSerializerFactory implements TypeSerializerFactory {
-
-    private final Logger log = getLogger(getClass());
-    private final TypeSerializer<?> typeSerializer;
-
-    public DefaultCatalystTypeSerializerFactory(Serializer serializer) {
-        typeSerializer = new InternalSerializer<>(serializer);
-    }
-
-    @Override
-    public TypeSerializer<?> createSerializer(Class<?> clazz) {
-        return typeSerializer;
-    }
-
-    private class InternalSerializer<T> implements TypeSerializer<T> {
-
-        private final Serializer serializer;
-
-        InternalSerializer(Serializer serializer) {
-            this.serializer = serializer;
-        }
-
-        @Override
-        public void write(T object, BufferOutput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
-            try {
-                byte[] payload = this.serializer.encode(object);
-                buffer.writeInt(payload.length);
-                buffer.write(payload);
-            } catch (Exception e) {
-                log.warn("Failed to serialize {}", object, e);
-                throw Throwables.propagate(e);
-            }
-        }
-
-        @Override
-        public T read(Class<T> type, BufferInput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
-            int size = buffer.readInt();
-            try {
-                byte[] payload = new byte[size];
-                buffer.read(payload);
-                return this.serializer.decode(payload);
-            } catch (Exception e) {
-                log.warn("Failed to deserialize as type {}. Payload size: {}", type, size, e);
-                throw Throwables.propagate(e);
-            }
-        }
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index 820174d..b5284de 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -41,7 +41,7 @@
 
     @Override
     public AsyncConsistentMap<K, V> buildAsyncMap() {
-        AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer(), executorSupplier());
+        AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
         map = relaxedReadConsistency() ? DistributedPrimitives.newCachingMap(map) : map;
         map = readOnly() ? DistributedPrimitives.newUnmodifiableMap(map) : map;
         return meteringEnabled() ? DistributedPrimitives.newMeteredMap(map) : map;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
index ba7d673..4b23253 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
@@ -36,7 +36,7 @@
 
     @Override
     public AsyncConsistentMultimap<K, V> buildMultimap() {
-        return primitiveCreator.newAsyncConsistentSetMultimap(name(), serializer(), executorSupplier());
+        return primitiveCreator.newAsyncConsistentSetMultimap(name(), serializer());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
index 5e2a8b4..2aa906a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
@@ -35,7 +35,7 @@
 
     @Override
     public AsyncConsistentTreeMap<V> buildTreeMap() {
-        return primitiveCreator.newAsyncConsistentTreeMap(name(), serializer(), executorSupplier());
+        return primitiveCreator.newAsyncConsistentTreeMap(name(), serializer());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
index 5e95180..c17f91d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import java.util.concurrent.Executor;
 import java.util.function.Supplier;
 
 import org.onosproject.core.ApplicationId;
@@ -54,12 +53,6 @@
     }
 
     @Override
-    public DistributedSetBuilder<E> withExecutorSupplier(Supplier<Executor> executorSupplier) {
-        mapBuilder.withExecutorSupplier(executorSupplier);
-        return this;
-    }
-
-    @Override
     public DistributedSetBuilder<E> withPurgeOnUninstall() {
         mapBuilder.withPurgeOnUninstall();
         return this;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
index 65c0504..8d21e60 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
@@ -47,6 +47,6 @@
     @Deprecated
     @Override
     public AsyncDocumentTree<V> build() {
-        return primitiveCreator.newAsyncDocumentTree(name(), serializer(), executorSupplier());
+        return primitiveCreator.newAsyncDocumentTree(name(), serializer());
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
index 6f8f55d..69788f9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
@@ -32,6 +32,6 @@
 
     @Override
     public AsyncLeaderElector build() {
-        return primitiveCreator.newAsyncLeaderElector(name(), executorSupplier());
+        return primitiveCreator.newAsyncLeaderElector(name());
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java
deleted file mode 100644
index 8955a6d..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import io.atomix.catalyst.concurrent.Listener;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.Command;
-import io.atomix.copycat.Query;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.copycat.session.Session;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
-/**
- * {@code CopycatClient} that merely delegates control to
- * another CopycatClient.
- */
-public class DelegatingCopycatClient implements CopycatClient {
-
-    protected final CopycatClient client;
-
-    DelegatingCopycatClient(CopycatClient client) {
-        this.client = client;
-    }
-
-    @Override
-    public State state() {
-        return client.state();
-    }
-
-    @Override
-    public Listener<State> onStateChange(Consumer<State> callback) {
-        return client.onStateChange(callback);
-    }
-
-    @Override
-    public ThreadContext context() {
-        return client.context();
-    }
-
-    @Override
-    public Transport transport() {
-        return client.transport();
-    }
-
-    @Override
-    public Serializer serializer() {
-        return client.serializer();
-    }
-
-    @Override
-    public Session session() {
-        return client.session();
-    }
-
-    @Override
-    public <T> CompletableFuture<T> submit(Command<T> command) {
-        return client.submit(command);
-    }
-
-    @Override
-    public <T> CompletableFuture<T> submit(Query<T> query) {
-        return client.submit(query);
-    }
-
-    @Override
-    public Listener<Void> onEvent(String event, Runnable callback) {
-        return client.onEvent(event, callback);
-    }
-
-    @Override
-    public <T> Listener<T> onEvent(String event, Consumer<T> callback) {
-        return client.onEvent(event, callback);
-    }
-
-    @Override
-    public CompletableFuture<CopycatClient> connect(Collection<Address> members) {
-        return client.connect(members);
-    }
-
-    @Override
-    public CompletableFuture<CopycatClient> recover() {
-        return client.recover();
-    }
-
-    @Override
-    public CompletableFuture<Void> close() {
-        return client.close();
-    }
-}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java
deleted file mode 100644
index 504fa75..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import org.onosproject.store.service.AsyncAtomicCounter;
-
-/**
- * {@link AsyncAtomicCounter} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncAtomicCounter extends ExecutingDistributedPrimitive implements AsyncAtomicCounter {
-    private final AsyncAtomicCounter delegateCounter;
-
-    public ExecutingAsyncAtomicCounter(
-            AsyncAtomicCounter delegateCounter, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateCounter, orderedExecutor, threadPoolExecutor);
-        this.delegateCounter = delegateCounter;
-    }
-
-    @Override
-    public CompletableFuture<Long> incrementAndGet() {
-        return asyncFuture(delegateCounter.incrementAndGet());
-    }
-
-    @Override
-    public CompletableFuture<Long> getAndIncrement() {
-        return asyncFuture(delegateCounter.getAndIncrement());
-    }
-
-    @Override
-    public CompletableFuture<Long> getAndAdd(long delta) {
-        return asyncFuture(delegateCounter.getAndAdd(delta));
-    }
-
-    @Override
-    public CompletableFuture<Long> addAndGet(long delta) {
-        return asyncFuture(delegateCounter.addAndGet(delta));
-    }
-
-    @Override
-    public CompletableFuture<Long> get() {
-        return asyncFuture(delegateCounter.get());
-    }
-
-    @Override
-    public CompletableFuture<Void> set(long value) {
-        return asyncFuture(delegateCounter.set(value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
-        return asyncFuture(delegateCounter.compareAndSet(expectedValue, updateValue));
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java
deleted file mode 100644
index a17a2f0..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import org.onosproject.store.service.AsyncAtomicCounterMap;
-
-/**
- * {@link org.onosproject.store.service.AsyncAtomicCounterMap} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncAtomicCounterMap<K>
-        extends ExecutingDistributedPrimitive implements AsyncAtomicCounterMap<K> {
-    private final AsyncAtomicCounterMap<K> delegateMap;
-
-    public ExecutingAsyncAtomicCounterMap(
-            AsyncAtomicCounterMap<K> delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateMap, orderedExecutor, threadPoolExecutor);
-        this.delegateMap = delegateMap;
-    }
-
-    @Override
-    public CompletableFuture<Long> incrementAndGet(K key) {
-        return asyncFuture(delegateMap.incrementAndGet(key));
-    }
-
-    @Override
-    public CompletableFuture<Long> decrementAndGet(K key) {
-        return asyncFuture(delegateMap.decrementAndGet(key));
-    }
-
-    @Override
-    public CompletableFuture<Long> getAndIncrement(K key) {
-        return asyncFuture(delegateMap.getAndIncrement(key));
-    }
-
-    @Override
-    public CompletableFuture<Long> getAndDecrement(K key) {
-        return asyncFuture(delegateMap.getAndDecrement(key));
-    }
-
-    @Override
-    public CompletableFuture<Long> addAndGet(K key, long delta) {
-        return asyncFuture(delegateMap.addAndGet(key, delta));
-    }
-
-    @Override
-    public CompletableFuture<Long> getAndAdd(K key, long delta) {
-        return asyncFuture(delegateMap.getAndAdd(key, delta));
-    }
-
-    @Override
-    public CompletableFuture<Long> get(K key) {
-        return asyncFuture(delegateMap.get(key));
-    }
-
-    @Override
-    public CompletableFuture<Long> put(K key, long newValue) {
-        return asyncFuture(delegateMap.put(key, newValue));
-    }
-
-    @Override
-    public CompletableFuture<Long> putIfAbsent(K key, long newValue) {
-        return asyncFuture(delegateMap.putIfAbsent(key, newValue));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> replace(K key, long expectedOldValue, long newValue) {
-        return asyncFuture(delegateMap.replace(key, expectedOldValue, newValue));
-    }
-
-    @Override
-    public CompletableFuture<Long> remove(K key) {
-        return asyncFuture(delegateMap.remove(key));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> remove(K key, long value) {
-        return asyncFuture(delegateMap.remove(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Integer> size() {
-        return asyncFuture(delegateMap.size());
-    }
-
-    @Override
-    public CompletableFuture<Boolean> isEmpty() {
-        return asyncFuture(delegateMap.isEmpty());
-    }
-
-    @Override
-    public CompletableFuture<Void> clear() {
-        return asyncFuture(delegateMap.clear());
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java
deleted file mode 100644
index baf2b8a..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicIdGenerator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import org.onosproject.store.service.AsyncAtomicIdGenerator;
-
-/**
- * {@link AsyncAtomicIdGenerator} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncAtomicIdGenerator extends ExecutingDistributedPrimitive implements AsyncAtomicIdGenerator {
-    private final AsyncAtomicIdGenerator delegateIdGenerator;
-
-    public ExecutingAsyncAtomicIdGenerator(
-            AsyncAtomicIdGenerator delegateIdGenerator, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateIdGenerator, orderedExecutor, threadPoolExecutor);
-        this.delegateIdGenerator = delegateIdGenerator;
-    }
-
-    @Override
-    public CompletableFuture<Long> nextId() {
-        return asyncFuture(delegateIdGenerator.nextId());
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
deleted file mode 100644
index c8bba52..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import com.google.common.collect.Maps;
-import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.AtomicValueEventListener;
-
-/**
- * {@link AsyncAtomicValue} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncAtomicValue<V> extends ExecutingDistributedPrimitive implements AsyncAtomicValue<V> {
-    private final AsyncAtomicValue<V> delegateValue;
-    private final Executor orderedExecutor;
-    private final Map<AtomicValueEventListener<V>, AtomicValueEventListener<V>> listenerMap = Maps.newConcurrentMap();
-
-    public ExecutingAsyncAtomicValue(
-            AsyncAtomicValue<V> delegateValue, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateValue, orderedExecutor, threadPoolExecutor);
-        this.delegateValue = delegateValue;
-        this.orderedExecutor = orderedExecutor;
-    }
-
-    @Override
-    public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
-        return asyncFuture(delegateValue.compareAndSet(expect, update));
-    }
-
-    @Override
-    public CompletableFuture<V> get() {
-        return asyncFuture(delegateValue.get());
-    }
-
-    @Override
-    public CompletableFuture<V> getAndSet(V value) {
-        return asyncFuture(delegateValue.getAndSet(value));
-    }
-
-    @Override
-    public CompletableFuture<Void> set(V value) {
-        return asyncFuture(delegateValue.set(value));
-    }
-
-    @Override
-    public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
-        AtomicValueEventListener<V> wrappedListener = e -> orderedExecutor.execute(() -> listener.event(e));
-        listenerMap.put(listener, wrappedListener);
-        return asyncFuture(delegateValue.addListener(wrappedListener));
-    }
-
-    @Override
-    public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
-        AtomicValueEventListener<V> wrappedListener = listenerMap.remove(listener);
-        if (wrappedListener != null) {
-            return asyncFuture(delegateValue.removeListener(wrappedListener));
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java
deleted file mode 100644
index d955121..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-/**
- * An {@link org.onosproject.store.service.AsyncConsistentMap} that completes asynchronous calls on a provided
- * {@link Executor}.
- */
-public class ExecutingAsyncConsistentMap<K, V>
-        extends ExecutingDistributedPrimitive implements AsyncConsistentMap<K, V> {
-    private final AsyncConsistentMap<K, V> delegateMap;
-
-    public ExecutingAsyncConsistentMap(
-            AsyncConsistentMap<K, V> delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateMap, orderedExecutor, threadPoolExecutor);
-        this.delegateMap = delegateMap;
-    }
-
-    @Override
-    public CompletableFuture<Integer> size() {
-        return asyncFuture(delegateMap.size());
-    }
-
-    @Override
-    public CompletableFuture<Boolean> containsKey(K key) {
-        return asyncFuture(delegateMap.containsKey(key));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> containsValue(V value) {
-        return asyncFuture(delegateMap.containsValue(value));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> get(K key) {
-        return asyncFuture(delegateMap.get(key));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
-        return asyncFuture(delegateMap.getOrDefault(key, defaultValue));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> computeIf(
-            K key, Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
-        return asyncFuture(delegateMap.computeIf(key, condition, remappingFunction));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> put(K key, V value) {
-        return asyncFuture(delegateMap.put(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
-        return asyncFuture(delegateMap.putAndGet(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> remove(K key) {
-        return asyncFuture(delegateMap.remove(key));
-    }
-
-    @Override
-    public CompletableFuture<Void> clear() {
-        return asyncFuture(delegateMap.clear());
-    }
-
-    @Override
-    public CompletableFuture<Set<K>> keySet() {
-        return asyncFuture(delegateMap.keySet());
-    }
-
-    @Override
-    public CompletableFuture<Collection<Versioned<V>>> values() {
-        return asyncFuture(delegateMap.values());
-    }
-
-    @Override
-    public CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet() {
-        return asyncFuture(delegateMap.entrySet());
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
-        return asyncFuture(delegateMap.putIfAbsent(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> remove(K key, V value) {
-        return asyncFuture(delegateMap.remove(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> remove(K key, long version) {
-        return asyncFuture(delegateMap.remove(key, version));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> replace(K key, V value) {
-        return asyncFuture(delegateMap.replace(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
-        return asyncFuture(delegateMap.replace(key, oldValue, newValue));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
-        return asyncFuture(delegateMap.replace(key, oldVersion, newValue));
-    }
-
-    @Override
-    public CompletableFuture<Version> begin(TransactionId transactionId) {
-        return asyncFuture(delegateMap.begin(transactionId));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
-        return asyncFuture(delegateMap.prepare(transactionLog));
-    }
-
-    @Override
-    public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return asyncFuture(delegateMap.commit(transactionId));
-    }
-
-    @Override
-    public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return asyncFuture(delegateMap.rollback(transactionId));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
-        return asyncFuture(delegateMap.prepareAndCommit(transactionLog));
-    }
-
-    @Override
-    public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
-        return addListener(listener);
-    }
-
-    @Override
-    public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
-        return asyncFuture(delegateMap.addListener(listener, executor));
-    }
-
-    @Override
-    public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
-        return asyncFuture(delegateMap.removeListener(listener));
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
deleted file mode 100644
index 2045586..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.MultimapEventListener;
-import org.onosproject.store.service.Versioned;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
- * {@link Executor}.
- */
-public class ExecutingAsyncConsistentMultimap<K, V>
-        extends ExecutingDistributedPrimitive implements AsyncConsistentMultimap<K, V> {
-    private final AsyncConsistentMultimap<K, V> delegateMap;
-
-    public ExecutingAsyncConsistentMultimap(
-            AsyncConsistentMultimap<K, V> delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateMap, orderedExecutor, threadPoolExecutor);
-        this.delegateMap = delegateMap;
-    }
-
-    @Override
-    public CompletableFuture<Integer> size() {
-        return asyncFuture(delegateMap.size());
-    }
-
-    @Override
-    public CompletableFuture<Boolean> isEmpty() {
-        return asyncFuture(delegateMap.isEmpty());
-    }
-
-    @Override
-    public CompletableFuture<Boolean> containsKey(K key) {
-        return asyncFuture(delegateMap.containsKey(key));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> containsValue(V value) {
-        return asyncFuture(delegateMap.containsValue(value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> containsEntry(K key, V value) {
-        return asyncFuture(delegateMap.containsEntry(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> put(K key, V value) {
-        return asyncFuture(delegateMap.put(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> remove(K key, V value) {
-        return asyncFuture(delegateMap.remove(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
-        return asyncFuture(delegateMap.removeAll(key, values));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
-        return asyncFuture(delegateMap.removeAll(key));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
-        return asyncFuture(delegateMap.putAll(key, values));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
-        return asyncFuture(delegateMap.replaceValues(key, values));
-    }
-
-    @Override
-    public CompletableFuture<Void> clear() {
-        return asyncFuture(delegateMap.clear());
-    }
-
-    @Override
-    public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
-        return asyncFuture(delegateMap.get(key));
-    }
-
-    @Override
-    public CompletableFuture<Set<K>> keySet() {
-        return asyncFuture(delegateMap.keySet());
-    }
-
-    @Override
-    public CompletableFuture<Multiset<K>> keys() {
-        return asyncFuture(delegateMap.keys());
-    }
-
-    @Override
-    public CompletableFuture<Multiset<V>> values() {
-        return asyncFuture(delegateMap.values());
-    }
-
-    @Override
-    public CompletableFuture<Collection<Map.Entry<K, V>>> entries() {
-        return asyncFuture(delegateMap.entries());
-    }
-
-    @Override
-    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
-        return asyncFuture(delegateMap.addListener(listener, executor));
-    }
-
-    @Override
-    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
-        return asyncFuture(delegateMap.removeListener(listener));
-    }
-
-    @Override
-    public CompletableFuture<Map<K, Collection<V>>> asMap() {
-        return asyncFuture(delegateMap.asMap());
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java
deleted file mode 100644
index 3a4fe85..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentTreeMap;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-/**
- * {@link org.onosproject.store.service.AsyncConsistentTreeMap} that executes asynchronous callbacks on a provided
- * {@link Executor}.
- */
-public class ExecutingAsyncConsistentTreeMap<V>
-        extends ExecutingDistributedPrimitive implements AsyncConsistentTreeMap<V> {
-    private final AsyncConsistentTreeMap<V> delegateMap;
-
-    public ExecutingAsyncConsistentTreeMap(
-            AsyncConsistentTreeMap<V> delegateMap, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateMap, orderedExecutor, threadPoolExecutor);
-        this.delegateMap = delegateMap;
-    }
-
-    @Override
-    public CompletableFuture<String> firstKey() {
-        return asyncFuture(delegateMap.firstKey());
-    }
-
-    @Override
-    public CompletableFuture<String> lastKey() {
-        return asyncFuture(delegateMap.lastKey());
-    }
-
-    @Override
-    public CompletableFuture<Map.Entry<String, Versioned<V>>> ceilingEntry(String key) {
-        return asyncFuture(delegateMap.ceilingEntry(key));
-    }
-
-    @Override
-    public CompletableFuture<Map.Entry<String, Versioned<V>>> floorEntry(String key) {
-        return asyncFuture(delegateMap.floorEntry(key));
-    }
-
-    @Override
-    public CompletableFuture<Map.Entry<String, Versioned<V>>> higherEntry(String key) {
-        return asyncFuture(delegateMap.higherEntry(key));
-    }
-
-    @Override
-    public CompletableFuture<Map.Entry<String, Versioned<V>>> lowerEntry(String key) {
-        return asyncFuture(delegateMap.lowerEntry(key));
-    }
-
-    @Override
-    public CompletableFuture<Map.Entry<String, Versioned<V>>> firstEntry() {
-        return asyncFuture(delegateMap.firstEntry());
-    }
-
-    @Override
-    public CompletableFuture<Integer> size() {
-        return asyncFuture(delegateMap.size());
-    }
-
-    @Override
-    public CompletableFuture<Map.Entry<String, Versioned<V>>> lastEntry() {
-        return asyncFuture(delegateMap.lastEntry());
-    }
-
-    @Override
-    public CompletableFuture<Map.Entry<String, Versioned<V>>> pollFirstEntry() {
-        return asyncFuture(delegateMap.pollFirstEntry());
-    }
-
-    @Override
-    public CompletableFuture<Boolean> containsKey(String key) {
-        return asyncFuture(delegateMap.containsKey(key));
-    }
-
-    @Override
-    public CompletableFuture<Map.Entry<String, Versioned<V>>> pollLastEntry() {
-        return asyncFuture(delegateMap.pollLastEntry());
-    }
-
-    @Override
-    public CompletableFuture<String> lowerKey(String key) {
-        return asyncFuture(delegateMap.lowerKey(key));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> containsValue(V value) {
-        return asyncFuture(delegateMap.containsValue(value));
-    }
-
-    @Override
-    public CompletableFuture<String> floorKey(String key) {
-        return asyncFuture(delegateMap.floorKey(key));
-    }
-
-    @Override
-    public CompletableFuture<String> ceilingKey(String key) {
-        return asyncFuture(delegateMap.ceilingKey(key));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> get(String key) {
-        return asyncFuture(delegateMap.get(key));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> getOrDefault(String key, V defaultValue) {
-        return asyncFuture(delegateMap.getOrDefault(key, defaultValue));
-    }
-
-    @Override
-    public CompletableFuture<String> higherKey(String key) {
-        return asyncFuture(delegateMap.higherKey(key));
-    }
-
-    @Override
-    public CompletableFuture<NavigableSet<String>> navigableKeySet() {
-        return asyncFuture(delegateMap.navigableKeySet());
-    }
-
-    @Override
-    public CompletableFuture<NavigableMap<String, V>> subMap(
-            String upperKey, String lowerKey, boolean inclusiveUpper, boolean inclusiveLower) {
-        return asyncFuture(delegateMap.subMap(upperKey, lowerKey, inclusiveUpper, inclusiveLower));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> computeIf(
-            String key, Predicate<? super V> condition,
-            BiFunction<? super String, ? super V, ? extends V> remappingFunction) {
-        return asyncFuture(delegateMap.computeIf(key, condition, remappingFunction));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> put(String key, V value) {
-        return asyncFuture(delegateMap.put(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> putAndGet(String key, V value) {
-        return asyncFuture(delegateMap.putAndGet(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> remove(String key) {
-        return asyncFuture(delegateMap.remove(key));
-    }
-
-    @Override
-    public CompletableFuture<Void> clear() {
-        return asyncFuture(delegateMap.clear());
-    }
-
-    @Override
-    public CompletableFuture<Set<String>> keySet() {
-        return asyncFuture(delegateMap.keySet());
-    }
-
-    @Override
-    public CompletableFuture<Collection<Versioned<V>>> values() {
-        return asyncFuture(delegateMap.values());
-    }
-
-    @Override
-    public CompletableFuture<Set<Map.Entry<String, Versioned<V>>>> entrySet() {
-        return asyncFuture(delegateMap.entrySet());
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> putIfAbsent(String key, V value) {
-        return asyncFuture(delegateMap.putIfAbsent(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> remove(String key, V value) {
-        return asyncFuture(delegateMap.remove(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> remove(String key, long version) {
-        return asyncFuture(delegateMap.remove(key, version));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> replace(String key, V value) {
-        return asyncFuture(delegateMap.replace(key, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> replace(String key, V oldValue, V newValue) {
-        return asyncFuture(delegateMap.replace(key, oldValue, newValue));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> replace(String key, long oldVersion, V newValue) {
-        return asyncFuture(delegateMap.replace(key, oldVersion, newValue));
-    }
-
-    @Override
-    public CompletableFuture<Version> begin(TransactionId transactionId) {
-        return asyncFuture(delegateMap.begin(transactionId));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V>> transactionLog) {
-        return asyncFuture(delegateMap.prepare(transactionLog));
-    }
-
-    @Override
-    public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return asyncFuture(delegateMap.commit(transactionId));
-    }
-
-    @Override
-    public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return asyncFuture(delegateMap.rollback(transactionId));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V>> transactionLog) {
-        return asyncFuture(delegateMap.prepareAndCommit(transactionLog));
-    }
-
-    @Override
-    public CompletableFuture<Void> addListener(MapEventListener<String, V> listener) {
-        return addListener(listener);
-    }
-
-    @Override
-    public CompletableFuture<Void> addListener(MapEventListener<String, V> listener, Executor executor) {
-        return asyncFuture(delegateMap.addListener(listener, executor));
-    }
-
-    @Override
-    public CompletableFuture<Void> removeListener(MapEventListener<String, V> listener) {
-        return asyncFuture(delegateMap.removeListener(listener));
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java
deleted file mode 100644
index f6fc3d1..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import com.google.common.collect.Maps;
-import org.onosproject.store.service.AsyncDocumentTree;
-import org.onosproject.store.service.DocumentPath;
-import org.onosproject.store.service.DocumentTreeListener;
-import org.onosproject.store.service.Versioned;
-
-/**
- * {@link AsyncDocumentTree} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncDocumentTree<V> extends ExecutingDistributedPrimitive implements AsyncDocumentTree<V> {
-    private final AsyncDocumentTree<V> delegateTree;
-    private final Executor orderedExecutor;
-    private final Map<DocumentTreeListener<V>, DocumentTreeListener<V>> listenerMap = Maps.newConcurrentMap();
-
-    public ExecutingAsyncDocumentTree(
-            AsyncDocumentTree<V> delegateTree, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateTree, orderedExecutor, threadPoolExecutor);
-        this.delegateTree = delegateTree;
-        this.orderedExecutor = orderedExecutor;
-    }
-
-    @Override
-    public DocumentPath root() {
-        return delegateTree.root();
-    }
-
-    @Override
-    public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
-        return asyncFuture(delegateTree.getChildren(path));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> get(DocumentPath path) {
-        return asyncFuture(delegateTree.get(path));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
-        return asyncFuture(delegateTree.set(path, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> create(DocumentPath path, V value) {
-        return asyncFuture(delegateTree.create(path, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
-        return asyncFuture(delegateTree.createRecursive(path, value));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
-        return asyncFuture(delegateTree.replace(path, newValue, version));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
-        return asyncFuture(delegateTree.replace(path, newValue, currentValue));
-    }
-
-    @Override
-    public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
-        return asyncFuture(delegateTree.removeNode(path));
-    }
-
-    @Override
-    public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
-        DocumentTreeListener<V> wrappedListener = e -> orderedExecutor.execute(() -> listener.event(e));
-        listenerMap.put(listener, wrappedListener);
-        return asyncFuture(delegateTree.addListener(path, wrappedListener));
-    }
-
-    @Override
-    public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
-        DocumentTreeListener<V> wrappedListener = listenerMap.remove(listener);
-        if (wrappedListener != null) {
-            return asyncFuture(delegateTree.removeListener(wrappedListener));
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java
deleted file mode 100644
index ba7cb81..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-
-import com.google.common.collect.Maps;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.Change;
-import org.onosproject.store.service.AsyncLeaderElector;
-
-/**
- * {@link AsyncLeaderElector} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingAsyncLeaderElector extends ExecutingDistributedPrimitive implements AsyncLeaderElector {
-    private final AsyncLeaderElector delegateElector;
-    private final Executor orderedExecutor;
-    private final Map<Consumer<Change<Leadership>>, Consumer<Change<Leadership>>> listenerMap = Maps.newConcurrentMap();
-
-    public ExecutingAsyncLeaderElector(
-            AsyncLeaderElector delegateElector, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateElector, orderedExecutor, threadPoolExecutor);
-        this.delegateElector = delegateElector;
-        this.orderedExecutor = orderedExecutor;
-    }
-
-    @Override
-    public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
-        return asyncFuture(delegateElector.run(topic, nodeId));
-    }
-
-    @Override
-    public CompletableFuture<Void> withdraw(String topic) {
-        return asyncFuture(delegateElector.withdraw(topic));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
-        return asyncFuture(delegateElector.anoint(topic, nodeId));
-    }
-
-    @Override
-    public CompletableFuture<Void> evict(NodeId nodeId) {
-        return asyncFuture(delegateElector.evict(nodeId));
-    }
-
-    @Override
-    public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
-        return asyncFuture(delegateElector.promote(topic, nodeId));
-    }
-
-    @Override
-    public CompletableFuture<Leadership> getLeadership(String topic) {
-        return asyncFuture(delegateElector.getLeadership(topic));
-    }
-
-    @Override
-    public CompletableFuture<Map<String, Leadership>> getLeaderships() {
-        return asyncFuture(delegateElector.getLeaderships());
-    }
-
-    @Override
-    public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> listener) {
-        Consumer<Change<Leadership>> wrappedListener = e -> orderedExecutor.execute(() -> listener.accept(e));
-        listenerMap.put(listener, wrappedListener);
-        return asyncFuture(delegateElector.addChangeListener(wrappedListener));
-    }
-
-    @Override
-    public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> listener) {
-        Consumer<Change<Leadership>> wrappedListener = listenerMap.remove(listener);
-        if (wrappedListener != null) {
-            return asyncFuture(delegateElector.removeChangeListener(wrappedListener));
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
deleted file mode 100644
index 836a682..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-
-import com.google.common.collect.Maps;
-import org.onlab.util.Tools;
-import org.onosproject.store.service.DistributedPrimitive;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base class for primitives that delegate asynchronous callbacks to a user provided {@link Executor}.
- */
-public abstract class ExecutingDistributedPrimitive
-        extends DelegatingDistributedPrimitive {
-    private final DistributedPrimitive primitive;
-    private final Executor orderedExecutor;
-    private final Executor threadPoolExecutor;
-    private final Map<Consumer<Status>, Consumer<Status>> listenerMap = Maps.newConcurrentMap();
-
-    protected ExecutingDistributedPrimitive(
-            DistributedPrimitive primitive, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(primitive);
-        this.primitive = primitive;
-        this.orderedExecutor = checkNotNull(orderedExecutor);
-        this.threadPoolExecutor = checkNotNull(threadPoolExecutor);
-    }
-
-    /**
-     * Creates a future to be completed asynchronously on the provided ordered and thread pool executors.
-     *
-     * @param future the future to be completed asynchronously
-     * @param <T> future result type
-     * @return a new {@link CompletableFuture} to be completed asynchronously using the primitive thread model
-     */
-    protected <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future) {
-        return Tools.orderedFuture(future, orderedExecutor, threadPoolExecutor);
-    }
-
-    @Override
-    public CompletableFuture<Void> destroy() {
-        return asyncFuture(primitive.destroy());
-    }
-
-    @Override
-    public void addStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
-        Consumer<DistributedPrimitive.Status> wrappedListener =
-                status -> orderedExecutor.execute(() -> listener.accept(status));
-        listenerMap.put(listener, wrappedListener);
-        primitive.addStatusChangeListener(wrappedListener);
-    }
-
-    @Override
-    public void removeStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
-        Consumer<DistributedPrimitive.Status> wrappedListener = listenerMap.remove(listener);
-        if (wrappedListener != null) {
-            primitive.removeStatusChangeListener(wrappedListener);
-        }
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java
deleted file mode 100644
index e6290b8..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-
-import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.Task;
-import org.onosproject.store.service.WorkQueue;
-import org.onosproject.store.service.WorkQueueStats;
-
-/**
- * {@link AsyncAtomicValue} that executes asynchronous callbacks on a user provided
- * {@link Executor}.
- */
-public class ExecutingWorkQueue<E> extends ExecutingDistributedPrimitive implements WorkQueue<E> {
-    private final WorkQueue<E> delegateQueue;
-
-    public ExecutingWorkQueue(WorkQueue<E> delegateQueue, Executor orderedExecutor, Executor threadPoolExecutor) {
-        super(delegateQueue, orderedExecutor, threadPoolExecutor);
-        this.delegateQueue = delegateQueue;
-    }
-
-    @Override
-    public CompletableFuture<Void> addMultiple(Collection<E> items) {
-        return asyncFuture(delegateQueue.addMultiple(items));
-    }
-
-    @Override
-    public CompletableFuture<Collection<Task<E>>> take(int maxItems) {
-        return asyncFuture(delegateQueue.take(maxItems));
-    }
-
-    @Override
-    public CompletableFuture<Void> complete(Collection<String> taskIds) {
-        return asyncFuture(delegateQueue.complete(taskIds));
-    }
-
-    @Override
-    public CompletableFuture<Void> registerTaskProcessor(
-            Consumer<E> taskProcessor, int parallelism, Executor executor) {
-        return asyncFuture(delegateQueue.registerTaskProcessor(taskProcessor, parallelism, executor));
-    }
-
-    @Override
-    public CompletableFuture<Void> stopProcessing() {
-        return asyncFuture(delegateQueue.stopProcessing());
-    }
-
-    @Override
-    public CompletableFuture<WorkQueueStats> stats() {
-        return asyncFuture(delegateQueue.stats());
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 5564844..ea8d075 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -40,8 +40,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.Executor;
-import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -60,13 +58,12 @@
     }
 
     @Override
-    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
         checkNotNull(name);
         checkNotNull(serializer);
         Map<PartitionId, AsyncConsistentMap<K, V>> maps =
                 Maps.transformValues(members,
-                                     partition -> partition.newAsyncConsistentMap(name, serializer, executorSupplier));
+                                     partition -> partition.newAsyncConsistentMap(name, serializer));
         Hasher<K> hasher = key -> {
             int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
             return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
@@ -75,51 +72,46 @@
     }
 
     @Override
-    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        return getCreator(name).newAsyncConsistentTreeMap(name, serializer, executorSupplier);
+    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
+        return getCreator(name).newAsyncConsistentTreeMap(name, serializer);
     }
 
     @Override
-    public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        return getCreator(name).newAsyncConsistentSetMultimap(name, serializer, executorSupplier);
+    public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
+        return getCreator(name).newAsyncConsistentSetMultimap(name, serializer);
     }
 
     @Override
-    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer, executorSupplier));
+    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
+        return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
     }
 
     @Override
-    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        return getCreator(name).newAsyncAtomicCounterMap(name, serializer, executorSupplier);
+    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
+        return getCreator(name).newAsyncAtomicCounterMap(name, serializer);
     }
 
     @Override
-    public AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier) {
-        return getCreator(name).newAsyncCounter(name, executorSupplier);
+    public AsyncAtomicCounter newAsyncCounter(String name) {
+        return getCreator(name).newAsyncCounter(name);
     }
 
     @Override
-    public AsyncAtomicIdGenerator newAsyncIdGenerator(String name, Supplier<Executor> executorSupplier) {
-        return getCreator(name).newAsyncIdGenerator(name, executorSupplier);
+    public AsyncAtomicIdGenerator newAsyncIdGenerator(String name) {
+        return getCreator(name).newAsyncIdGenerator(name);
     }
 
     @Override
-    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        return getCreator(name).newAsyncAtomicValue(name, serializer, executorSupplier);
+    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
+        return getCreator(name).newAsyncAtomicValue(name, serializer);
     }
 
     @Override
-    public AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier) {
+    public AsyncLeaderElector newAsyncLeaderElector(String name) {
         checkNotNull(name);
         Map<PartitionId, AsyncLeaderElector> leaderElectors =
                 Maps.transformValues(members,
-                                     partition -> partition.newAsyncLeaderElector(name, executorSupplier));
+                                     partition -> partition.newAsyncLeaderElector(name));
         Hasher<String> hasher = topic -> {
             int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt();
             return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
@@ -128,14 +120,13 @@
     }
 
     @Override
-    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        return getCreator(name).newWorkQueue(name, serializer, executorSupplier);
+    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+        return getCreator(name).newWorkQueue(name, serializer);
     }
 
     @Override
-    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        return getCreator(name).newAsyncDocumentTree(name, serializer, executorSupplier);
+    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+        return getCreator(name).newAsyncDocumentTree(name, serializer);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java
deleted file mode 100644
index d541bb1..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.maxPriority;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.net.ConnectException;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Predicate;
-
-import org.onlab.util.Tools;
-import org.onosproject.store.service.StorageException;
-import org.slf4j.Logger;
-
-import com.google.common.base.Throwables;
-
-import io.atomix.catalyst.transport.TransportException;
-import io.atomix.copycat.Query;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.copycat.error.QueryException;
-import io.atomix.copycat.error.UnknownSessionException;
-import io.atomix.copycat.session.ClosedSessionException;
-
-/**
- * Custom {@code CopycatClient} for injecting additional logic that runs before/after operation submission.
- */
-public class OnosCopycatClient extends DelegatingCopycatClient {
-
-    private final int maxRetries;
-    private final long delayBetweenRetriesMillis;
-    private final ScheduledExecutorService executor;
-    private final Logger log = getLogger(getClass());
-
-    private final Predicate<Throwable> retryableCheck = e -> e instanceof ConnectException
-            || e instanceof TimeoutException
-            || e instanceof TransportException
-            || e instanceof ClosedChannelException
-            || e instanceof QueryException
-            || e instanceof UnknownSessionException
-            || e instanceof ClosedSessionException
-            || e instanceof StorageException.Unavailable;
-
-    OnosCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
-        super(client);
-        this.maxRetries = maxRetries;
-        this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
-        this.executor = newSingleThreadScheduledExecutor(maxPriority(groupedThreads("OnosCopycat", "client", log)));
-    }
-
-    @Override
-    public CompletableFuture<Void> close() {
-        executor.shutdown();
-        return super.close();
-    }
-
-    @Override
-    public <T> CompletableFuture<T> submit(Query<T> query) {
-        if (state() == State.CLOSED) {
-            return Tools.exceptionalFuture(new StorageException.Unavailable());
-        }
-        CompletableFuture<T> future = new CompletableFuture<>();
-        executor.execute(() -> submit(query, 1, future));
-        return future;
-    }
-
-    private <T> void submit(Query<T> query, int attemptIndex, CompletableFuture<T> future) {
-        client.submit(query).whenComplete((r, e) -> {
-            if (e != null) {
-                if (attemptIndex < maxRetries + 1 && retryableCheck.test(Throwables.getRootCause(e))) {
-                    log.debug("Retry attempt ({} of {}). Failure due to {}",
-                            attemptIndex, maxRetries, Throwables.getRootCause(e).getClass());
-                    executor.schedule(() ->
-                        submit(query, attemptIndex + 1, future), delayBetweenRetriesMillis, TimeUnit.MILLISECONDS);
-                } else {
-                    future.completeExceptionally(e);
-                }
-            } else {
-                future.complete(r);
-            }
-        });
-    }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index bacda00..2201bb5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -16,19 +16,16 @@
 
 package org.onosproject.store.primitives.impl;
 
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
 import java.io.File;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -46,7 +43,7 @@
 import org.onosproject.cluster.PartitionDiff;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.PartitionAdminService;
 import org.onosproject.store.primitives.PartitionEvent;
@@ -56,11 +53,9 @@
 import org.onosproject.store.service.PartitionInfo;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
 import static org.onosproject.security.AppGuard.checkPermission;
 import static org.onosproject.security.AppPermission.Type.PARTITION_READ;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Implementation of {@code PartitionService} and {@code PartitionAdminService}.
@@ -73,7 +68,7 @@
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected MessagingService messagingService;
+    protected ClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterMetadataService metadataService;
@@ -84,9 +79,6 @@
     private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
     private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
     private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
-    private final ExecutorService sharedPrimitiveExecutor = Executors.newFixedThreadPool(
-            Runtime.getRuntime().availableProcessors(),
-            groupedThreads("onos/primitives", "primitive-events", log));
 
     @Activate
     public void activate() {
@@ -96,10 +88,8 @@
         currentClusterMetadata.get()
                        .getPartitions()
                        .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
-                               messagingService,
+                               clusterCommunicator,
                                clusterService,
-                               CatalystSerializers.getSerializer(),
-                               sharedPrimitiveExecutor,
                                new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
 
         CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
new file mode 100644
index 0000000..099e17a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.session.SessionId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.Serializer;
+
+/**
+ * Raft client protocol that uses a cluster communicator.
+ */
+public class RaftClientCommunicator extends RaftCommunicator implements RaftClientProtocol {
+
+    public RaftClientCommunicator(
+            PartitionId partitionId,
+            Serializer serializer,
+            ClusterCommunicationService clusterCommunicator) {
+        super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+    }
+
+    @Override
+    public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) {
+        return sendAndReceive(context.openSessionSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) {
+        return sendAndReceive(context.closeSessionSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
+        return sendAndReceive(context.keepAliveSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
+        return sendAndReceive(context.querySubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) {
+        return sendAndReceive(context.commandSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) {
+        return sendAndReceive(context.metadataSubject, request, memberId);
+    }
+
+    @Override
+    public void reset(Collection<MemberId> members, ResetRequest request) {
+        Set<NodeId> nodes = members.stream().map(m -> NodeId.nodeId(m.id())).collect(Collectors.toSet());
+        clusterCommunicator.multicast(
+                request,
+                context.resetSubject(request.session()),
+                serializer::encode,
+                nodes);
+    }
+
+    @Override
+    public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener, Executor executor) {
+        clusterCommunicator.addSubscriber(
+                context.publishSubject(sessionId.id()),
+                serializer::decode,
+                listener,
+                executor);
+    }
+
+    @Override
+    public void unregisterPublishListener(SessionId sessionId) {
+        clusterCommunicator.removeSubscriber(context.publishSubject(sessionId.id()));
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
new file mode 100644
index 0000000..b708a34
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.net.ConnectException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import io.atomix.protocols.raft.RaftException;
+import io.atomix.protocols.raft.cluster.MemberId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingException;
+import org.onosproject.store.service.Serializer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for Raft protocol client/server.
+ */
+public abstract class RaftCommunicator {
+    protected final RaftMessageContext context;
+    protected final Serializer serializer;
+    protected final ClusterCommunicationService clusterCommunicator;
+
+    public RaftCommunicator(
+            RaftMessageContext context,
+            Serializer serializer,
+            ClusterCommunicationService clusterCommunicator) {
+        this.context = checkNotNull(context, "context cannot be null");
+        this.serializer = checkNotNull(serializer, "serializer cannot be null");
+        this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
+    }
+
+    protected <T, U> CompletableFuture<U> sendAndReceive(MessageSubject subject, T request, MemberId memberId) {
+        CompletableFuture<U> future = new CompletableFuture<>();
+        clusterCommunicator.<T, U>sendAndReceive(
+                request, subject, serializer::encode, serializer::decode, NodeId.nodeId(memberId.id()))
+                .whenComplete((result, error) -> {
+                    if (error == null) {
+                        future.complete(result);
+                    } else {
+                        if (error instanceof CompletionException) {
+                            error = error.getCause();
+                        }
+                        if (error instanceof MessagingException.NoRemoteHandler) {
+                            error = new ConnectException(error.getMessage());
+                        } else if (error instanceof MessagingException.RemoteHandlerFailure
+                                || error instanceof MessagingException.ProtocolException) {
+                            error = new RaftException.ProtocolException(error.getMessage());
+                        }
+                        future.completeExceptionally(error);
+                    }
+                });
+        return future;
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
new file mode 100644
index 0000000..2deb86a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftMessageContext.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+/**
+ * Protocol message context.
+ */
+final class RaftMessageContext {
+    private final String prefix;
+    final MessageSubject openSessionSubject;
+    final MessageSubject closeSessionSubject;
+    final MessageSubject keepAliveSubject;
+    final MessageSubject querySubject;
+    final MessageSubject commandSubject;
+    final MessageSubject metadataSubject;
+    final MessageSubject joinSubject;
+    final MessageSubject leaveSubject;
+    final MessageSubject configureSubject;
+    final MessageSubject reconfigureSubject;
+    final MessageSubject installSubject;
+    final MessageSubject pollSubject;
+    final MessageSubject voteSubject;
+    final MessageSubject appendSubject;
+
+    RaftMessageContext(String prefix) {
+        this.prefix = prefix;
+        this.openSessionSubject = getSubject(prefix, "open");
+        this.closeSessionSubject = getSubject(prefix, "close");
+        this.keepAliveSubject = getSubject(prefix, "keep-alive");
+        this.querySubject = getSubject(prefix, "query");
+        this.commandSubject = getSubject(prefix, "command");
+        this.metadataSubject = getSubject(prefix, "metadata");
+        this.joinSubject = getSubject(prefix, "join");
+        this.leaveSubject = getSubject(prefix, "leave");
+        this.configureSubject = getSubject(prefix, "configure");
+        this.reconfigureSubject = getSubject(prefix, "reconfigure");
+        this.installSubject = getSubject(prefix, "install");
+        this.pollSubject = getSubject(prefix, "poll");
+        this.voteSubject = getSubject(prefix, "vote");
+        this.appendSubject = getSubject(prefix, "append");
+    }
+
+    private static MessageSubject getSubject(String prefix, String type) {
+        if (prefix == null) {
+            return new MessageSubject(type);
+        } else {
+            return new MessageSubject(String.format("%s-%s", prefix, type));
+        }
+    }
+
+    /**
+     * Returns the publish subject for the given session.
+     *
+     * @param sessionId the session for which to return the publish subject
+     * @return the publish subject for the given session
+     */
+    MessageSubject publishSubject(long sessionId) {
+        if (prefix == null) {
+            return new MessageSubject(String.format("publish-%d", sessionId));
+        } else {
+            return new MessageSubject(String.format("%s-publish-%d", prefix, sessionId));
+        }
+    }
+
+    /**
+     * Returns the reset subject for the given session.
+     *
+     * @param sessionId the session for which to return the reset subject
+     * @return the reset subject for the given session
+     */
+    MessageSubject resetSubject(long sessionId) {
+        if (prefix == null) {
+            return new MessageSubject(String.format("reset-%d", sessionId));
+        } else {
+            return new MessageSubject(String.format("%s-reset-%d", prefix, sessionId));
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
new file mode 100644
index 0000000..b31810f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -0,0 +1,301 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.session.SessionId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.Serializer;
+
+/**
+ * Raft server protocol that uses a {@link ClusterCommunicationService}.
+ */
+public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
+
+    public RaftServerCommunicator(
+            PartitionId partitionId,
+            Serializer serializer,
+            ClusterCommunicationService clusterCommunicator) {
+        super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+    }
+
+    @Override
+    public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId, OpenSessionRequest request) {
+        return sendAndReceive(context.openSessionSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId, CloseSessionRequest request) {
+        return sendAndReceive(context.closeSessionSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId, KeepAliveRequest request) {
+        return sendAndReceive(context.keepAliveSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
+        return sendAndReceive(context.querySubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<CommandResponse> command(MemberId memberId, CommandRequest request) {
+        return sendAndReceive(context.commandSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<MetadataResponse> metadata(MemberId memberId, MetadataRequest request) {
+        return sendAndReceive(context.metadataSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
+        return sendAndReceive(context.joinSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
+        return sendAndReceive(context.leaveSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<ConfigureResponse> configure(MemberId memberId, ConfigureRequest request) {
+        return sendAndReceive(context.configureSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId, ReconfigureRequest request) {
+        return sendAndReceive(context.reconfigureSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
+        return sendAndReceive(context.installSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
+        return sendAndReceive(context.pollSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
+        return sendAndReceive(context.voteSubject, request, memberId);
+    }
+
+    @Override
+    public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
+        return sendAndReceive(context.appendSubject, request, memberId);
+    }
+
+    @Override
+    public void publish(MemberId memberId, PublishRequest request) {
+        clusterCommunicator.unicast(request,
+                context.publishSubject(request.session()), serializer::encode, NodeId.nodeId(memberId.id()));
+    }
+
+    @Override
+    public void registerOpenSessionHandler(
+            Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.openSessionSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterOpenSessionHandler() {
+        clusterCommunicator.removeSubscriber(context.openSessionSubject);
+    }
+
+    @Override
+    public void registerCloseSessionHandler(
+            Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.closeSessionSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterCloseSessionHandler() {
+        clusterCommunicator.removeSubscriber(context.closeSessionSubject);
+    }
+
+    @Override
+    public void registerKeepAliveHandler(Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.keepAliveSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterKeepAliveHandler() {
+        clusterCommunicator.removeSubscriber(context.keepAliveSubject);
+    }
+
+    @Override
+    public void registerQueryHandler(Function<QueryRequest, CompletableFuture<QueryResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.querySubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterQueryHandler() {
+        clusterCommunicator.removeSubscriber(context.querySubject);
+    }
+
+    @Override
+    public void registerCommandHandler(Function<CommandRequest, CompletableFuture<CommandResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.commandSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterCommandHandler() {
+        clusterCommunicator.removeSubscriber(context.commandSubject);
+    }
+
+    @Override
+    public void registerMetadataHandler(Function<MetadataRequest, CompletableFuture<MetadataResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.metadataSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterMetadataHandler() {
+        clusterCommunicator.removeSubscriber(context.metadataSubject);
+    }
+
+    @Override
+    public void registerJoinHandler(Function<JoinRequest, CompletableFuture<JoinResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.joinSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterJoinHandler() {
+        clusterCommunicator.removeSubscriber(context.joinSubject);
+    }
+
+    @Override
+    public void registerLeaveHandler(Function<LeaveRequest, CompletableFuture<LeaveResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.leaveSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterLeaveHandler() {
+        clusterCommunicator.removeSubscriber(context.leaveSubject);
+    }
+
+    @Override
+    public void registerConfigureHandler(Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.configureSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterConfigureHandler() {
+        clusterCommunicator.removeSubscriber(context.configureSubject);
+    }
+
+    @Override
+    public void registerReconfigureHandler(
+            Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.reconfigureSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterReconfigureHandler() {
+        clusterCommunicator.removeSubscriber(context.reconfigureSubject);
+    }
+
+    @Override
+    public void registerInstallHandler(Function<InstallRequest, CompletableFuture<InstallResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.installSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterInstallHandler() {
+        clusterCommunicator.removeSubscriber(context.installSubject);
+    }
+
+    @Override
+    public void registerPollHandler(Function<PollRequest, CompletableFuture<PollResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.pollSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterPollHandler() {
+        clusterCommunicator.removeSubscriber(context.pollSubject);
+    }
+
+    @Override
+    public void registerVoteHandler(Function<VoteRequest, CompletableFuture<VoteResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.voteSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterVoteHandler() {
+        clusterCommunicator.removeSubscriber(context.voteSubject);
+    }
+
+    @Override
+    public void registerAppendHandler(Function<AppendRequest, CompletableFuture<AppendResponse>> handler) {
+        clusterCommunicator.addSubscriber(context.appendSubject, serializer::decode, handler, serializer::encode);
+    }
+
+    @Override
+    public void unregisterAppendHandler() {
+        clusterCommunicator.removeSubscriber(context.appendSubject);
+    }
+
+    @Override
+    public void registerResetListener(SessionId sessionId, Consumer<ResetRequest> listener, Executor executor) {
+        clusterCommunicator.addSubscriber(context.resetSubject(sessionId.id()), serializer::decode, listener, executor);
+    }
+
+    @Override
+    public void unregisterResetListener(SessionId sessionId) {
+        clusterCommunicator.removeSubscriber(context.resetSubject(sessionId.id()));
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
new file mode 100644
index 0000000..2db6aff
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageNamespaces.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+
+import io.atomix.protocols.raft.RaftError;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.cluster.RaftMember;
+import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
+import io.atomix.protocols.raft.event.RaftEvent;
+import io.atomix.protocols.raft.event.impl.DefaultEventType;
+import io.atomix.protocols.raft.operation.OperationType;
+import io.atomix.protocols.raft.operation.RaftOperation;
+import io.atomix.protocols.raft.operation.impl.DefaultOperationId;
+import io.atomix.protocols.raft.protocol.AppendRequest;
+import io.atomix.protocols.raft.protocol.AppendResponse;
+import io.atomix.protocols.raft.protocol.CloseSessionRequest;
+import io.atomix.protocols.raft.protocol.CloseSessionResponse;
+import io.atomix.protocols.raft.protocol.CommandRequest;
+import io.atomix.protocols.raft.protocol.CommandResponse;
+import io.atomix.protocols.raft.protocol.ConfigureRequest;
+import io.atomix.protocols.raft.protocol.ConfigureResponse;
+import io.atomix.protocols.raft.protocol.InstallRequest;
+import io.atomix.protocols.raft.protocol.InstallResponse;
+import io.atomix.protocols.raft.protocol.JoinRequest;
+import io.atomix.protocols.raft.protocol.JoinResponse;
+import io.atomix.protocols.raft.protocol.KeepAliveRequest;
+import io.atomix.protocols.raft.protocol.KeepAliveResponse;
+import io.atomix.protocols.raft.protocol.LeaveRequest;
+import io.atomix.protocols.raft.protocol.LeaveResponse;
+import io.atomix.protocols.raft.protocol.MetadataRequest;
+import io.atomix.protocols.raft.protocol.MetadataResponse;
+import io.atomix.protocols.raft.protocol.OpenSessionRequest;
+import io.atomix.protocols.raft.protocol.OpenSessionResponse;
+import io.atomix.protocols.raft.protocol.PollRequest;
+import io.atomix.protocols.raft.protocol.PollResponse;
+import io.atomix.protocols.raft.protocol.PublishRequest;
+import io.atomix.protocols.raft.protocol.QueryRequest;
+import io.atomix.protocols.raft.protocol.QueryResponse;
+import io.atomix.protocols.raft.protocol.RaftResponse;
+import io.atomix.protocols.raft.protocol.ReconfigureRequest;
+import io.atomix.protocols.raft.protocol.ReconfigureResponse;
+import io.atomix.protocols.raft.protocol.ResetRequest;
+import io.atomix.protocols.raft.protocol.VoteRequest;
+import io.atomix.protocols.raft.protocol.VoteResponse;
+import io.atomix.protocols.raft.session.RaftSessionMetadata;
+import io.atomix.protocols.raft.session.SessionId;
+import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
+import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
+import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
+import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
+import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
+import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
+import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
+import io.atomix.protocols.raft.storage.system.Configuration;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixCounterOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueEvents;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueOperations;
+import org.onosproject.store.serializers.KryoNamespaces;
+
+/**
+ * Storage serializer namespaces.
+ */
+public final class StorageNamespaces {
+
+    /**
+     * Raft protocol namespace.
+     */
+    public static final KryoNamespace RAFT_PROTOCOL = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.BASIC)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+            .register(OpenSessionRequest.class)
+            .register(OpenSessionResponse.class)
+            .register(CloseSessionRequest.class)
+            .register(CloseSessionResponse.class)
+            .register(KeepAliveRequest.class)
+            .register(KeepAliveResponse.class)
+            .register(QueryRequest.class)
+            .register(QueryResponse.class)
+            .register(CommandRequest.class)
+            .register(CommandResponse.class)
+            .register(MetadataRequest.class)
+            .register(MetadataResponse.class)
+            .register(JoinRequest.class)
+            .register(JoinResponse.class)
+            .register(LeaveRequest.class)
+            .register(LeaveResponse.class)
+            .register(ConfigureRequest.class)
+            .register(ConfigureResponse.class)
+            .register(ReconfigureRequest.class)
+            .register(ReconfigureResponse.class)
+            .register(InstallRequest.class)
+            .register(InstallResponse.class)
+            .register(PollRequest.class)
+            .register(PollResponse.class)
+            .register(VoteRequest.class)
+            .register(VoteResponse.class)
+            .register(AppendRequest.class)
+            .register(AppendResponse.class)
+            .register(PublishRequest.class)
+            .register(ResetRequest.class)
+            .register(RaftResponse.Status.class)
+            .register(RaftError.class)
+            .register(RaftError.Type.class)
+            .register(ReadConsistency.class)
+            .register(RaftSessionMetadata.class)
+            .register(CloseSessionEntry.class)
+            .register(CommandEntry.class)
+            .register(ConfigurationEntry.class)
+            .register(InitializeEntry.class)
+            .register(KeepAliveEntry.class)
+            .register(MetadataEntry.class)
+            .register(OpenSessionEntry.class)
+            .register(QueryEntry.class)
+            .register(RaftOperation.class)
+            .register(RaftEvent.class)
+            .register(DefaultEventType.class)
+            .register(DefaultOperationId.class)
+            .register(OperationType.class)
+            .register(ReadConsistency.class)
+            .register(ArrayList.class)
+            .register(LinkedList.class)
+            .register(Collections.emptyList().getClass())
+            .register(HashSet.class)
+            .register(DefaultRaftMember.class)
+            .register(MemberId.class)
+            .register(SessionId.class)
+            .register(RaftMember.Type.class)
+            .register(RaftMember.Status.class)
+            .register(Instant.class)
+            .register(Configuration.class)
+            .register(AtomixAtomicCounterMapOperations.class)
+            .register(AtomixConsistentMapEvents.class)
+            .register(AtomixConsistentMapOperations.class)
+            .register(AtomixConsistentSetMultimapOperations.class)
+            .register(AtomixConsistentSetMultimapEvents.class)
+            .register(AtomixConsistentTreeMapEvents.class)
+            .register(AtomixConsistentTreeMapOperations.class)
+            .register(AtomixCounterOperations.class)
+            .register(AtomixDocumentTreeEvents.class)
+            .register(AtomixDocumentTreeOperations.class)
+            .register(AtomixLeaderElectorEvents.class)
+            .register(AtomixLeaderElectorOperations.class)
+            .register(AtomixWorkQueueEvents.class)
+            .register(AtomixWorkQueueOperations.class)
+            .build("RaftProtocol");
+
+    /**
+     * Raft storage namespace.
+     */
+    public static final KryoNamespace RAFT_STORAGE = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.BASIC)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID + 100)
+            .register(CloseSessionEntry.class)
+            .register(CommandEntry.class)
+            .register(ConfigurationEntry.class)
+            .register(InitializeEntry.class)
+            .register(KeepAliveEntry.class)
+            .register(MetadataEntry.class)
+            .register(OpenSessionEntry.class)
+            .register(QueryEntry.class)
+            .register(RaftOperation.class)
+            .register(ReadConsistency.class)
+            .register(ArrayList.class)
+            .register(HashSet.class)
+            .register(DefaultRaftMember.class)
+            .register(MemberId.class)
+            .register(RaftMember.Type.class)
+            .register(RaftMember.Status.class)
+            .register(Instant.class)
+            .register(Configuration.class)
+            .register(AtomixAtomicCounterMapOperations.class)
+            .register(AtomixConsistentMapOperations.class)
+            .register(AtomixConsistentSetMultimapOperations.class)
+            .register(AtomixConsistentTreeMapOperations.class)
+            .register(AtomixCounterOperations.class)
+            .register(AtomixDocumentTreeOperations.class)
+            .register(AtomixLeaderElectorOperations.class)
+            .register(AtomixWorkQueueOperations.class)
+            .build("RaftStorage");
+
+    private StorageNamespaces() {
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index e9261c8..051c95a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -15,31 +15,36 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.resource.ResourceType;
-
 import java.io.File;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableMap;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.service.RaftService;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.Partition;
 import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapService;
+import org.onosproject.store.primitives.resources.impl.AtomixCounterService;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeService;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorService;
+import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueService;
+import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.PartitionInfo;
-
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableSet;
+import org.onosproject.store.service.Serializer;
 
 /**
  * Storage partition.
@@ -47,32 +52,32 @@
 public class StoragePartition implements Managed<StoragePartition> {
 
     private final AtomicBoolean isOpened = new AtomicBoolean(false);
-    private final Serializer serializer;
-    private final Executor sharedExecutor;
-    private final MessagingService messagingService;
-    private final ClusterService clusterService;
+    private final ClusterCommunicationService clusterCommunicator;
     private final File logFolder;
     private Partition partition;
     private NodeId localNodeId;
     private StoragePartitionServer server;
     private StoragePartitionClient client;
 
-    public static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
-                                                                    new ResourceType(AtomixLeaderElector.class),
-                                                                    new ResourceType(AtomixConsistentMap.class));
+    public static final Map<String, Supplier<RaftService>> RAFT_SERVICES =
+            ImmutableMap.<String, Supplier<RaftService>>builder()
+                    .put(DistributedPrimitive.Type.CONSISTENT_MAP.name(), AtomixConsistentMapService::new)
+                    .put(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name(), AtomixConsistentTreeMapService::new)
+                    .put(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name(), AtomixConsistentSetMultimapService::new)
+                    .put(DistributedPrimitive.Type.COUNTER_MAP.name(), AtomixAtomicCounterMapService::new)
+                    .put(DistributedPrimitive.Type.COUNTER.name(), AtomixCounterService::new)
+                    .put(DistributedPrimitive.Type.LEADER_ELECTOR.name(), AtomixLeaderElectorService::new)
+                    .put(DistributedPrimitive.Type.WORK_QUEUE.name(), AtomixWorkQueueService::new)
+                    .put(DistributedPrimitive.Type.DOCUMENT_TREE.name(), AtomixDocumentTreeService::new)
+                    .build();
 
     public StoragePartition(Partition partition,
-            MessagingService messagingService,
+            ClusterCommunicationService clusterCommunicator,
             ClusterService clusterService,
-            Serializer serializer,
-            Executor sharedExecutor,
             File logFolder) {
         this.partition = partition;
-        this.messagingService = messagingService;
-        this.clusterService = clusterService;
+        this.clusterCommunicator = clusterCommunicator;
         this.localNodeId = clusterService.getLocalNode().id();
-        this.serializer = serializer;
-        this.sharedExecutor = sharedExecutor;
         this.logFolder = logFolder;
     }
 
@@ -87,10 +92,14 @@
     @Override
     public CompletableFuture<Void> open() {
         if (partition.getMembers().contains(localNodeId)) {
-            openServer();
+            return openServer()
+                    .thenCompose(v -> openClient())
+                    .thenAccept(v -> isOpened.set(true))
+                    .thenApply(v -> null);
         }
-        return openClient().thenAccept(v -> isOpened.set(true))
-                           .thenApply(v -> null);
+        return openClient()
+                .thenAccept(v -> isOpened.set(true))
+                .thenApply(v -> null);
     }
 
     @Override
@@ -117,11 +126,11 @@
     }
 
     /**
-     * Returns the {@link Address addresses} of partition members.
-     * @return partition member addresses
+     * Returns the {@link MemberId identifiers} of partition members.
+     * @return partition member identifiers
      */
-    public Collection<Address> getMemberAddresses() {
-        return Collections2.transform(partition.getMembers(), this::toAddress);
+    public Collection<MemberId> getMemberIds() {
+        return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
     }
 
     /**
@@ -132,10 +141,12 @@
         if (!partition.getMembers().contains(localNodeId) || server != null) {
             return CompletableFuture.completedFuture(null);
         }
-        StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
-                this,
-                serializer,
-                () -> new CopycatTransport(partition.getId(), messagingService),
+        StoragePartitionServer server = new StoragePartitionServer(this,
+                MemberId.from(localNodeId.id()),
+                () -> new RaftServerCommunicator(
+                        partition.getId(),
+                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+                        clusterCommunicator),
                 logFolder);
         return server.open().thenRun(() -> this.server = server);
     }
@@ -149,19 +160,24 @@
                  .stream()
                  .filter(nodeId -> !nodeId.equals(localNodeId))
                  .collect(Collectors.toSet());
-        StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
-                this,
-                serializer,
-                () -> new CopycatTransport(partition.getId(), messagingService),
+        StoragePartitionServer server = new StoragePartitionServer(this,
+                MemberId.from(localNodeId.id()),
+                () -> new RaftServerCommunicator(
+                        partition.getId(),
+                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+                        clusterCommunicator),
                 logFolder);
-        return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
+        return server.join(Collections2.transform(otherMembers, n -> MemberId.from(n.id())))
+                .thenRun(() -> this.server = server);
     }
 
     private CompletableFuture<StoragePartitionClient> openClient() {
         client = new StoragePartitionClient(this,
-                serializer,
-                new CopycatTransport(partition.getId(), messagingService),
-                sharedExecutor);
+                MemberId.from(localNodeId.id()),
+                new RaftClientCommunicator(
+                        partition.getId(),
+                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+                        clusterCommunicator));
         return client.open().thenApply(v -> client);
     }
 
@@ -185,11 +201,6 @@
         return CompletableFuture.completedFuture(null);
     }
 
-    private Address toAddress(NodeId nodeId) {
-        ControllerNode node = clusterService.getNode(nodeId);
-        return new Address(node.ip().toString(), node.tcpPort());
-    }
-
     /**
      * Returns the partition information if this partition is locally managed i.e.
      * this node is a active member of the partition.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index d97868c..b55efc4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -15,30 +15,19 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import java.util.Collection;
+import java.time.Duration;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Suppliers;
-import io.atomix.AtomixClient;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.client.ConnectionStrategies;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.copycat.client.CopycatClient.State;
-import io.atomix.copycat.client.RecoveryStrategies;
-import io.atomix.copycat.client.ServerSelectionStrategies;
-import io.atomix.manager.ResourceClient;
-import io.atomix.manager.ResourceManagerException;
-import io.atomix.manager.util.ResourceManagerTypeResolver;
-import io.atomix.resource.ResourceRegistry;
-import io.atomix.resource.ResourceType;
-import io.atomix.variables.DistributedLong;
+import io.atomix.protocols.raft.RaftClient;
+import io.atomix.protocols.raft.ReadConsistency;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.RaftClientProtocol;
+import io.atomix.protocols.raft.proxy.CommunicationStrategy;
+import io.atomix.protocols.raft.session.RaftSessionMetadata;
 import org.onlab.util.HexString;
-import org.onlab.util.OrderedExecutor;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMap;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
@@ -60,7 +49,7 @@
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.PartitionClientInfo;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.WorkQueue;
@@ -76,55 +65,33 @@
     private final Logger log = getLogger(getClass());
 
     private final StoragePartition partition;
-    private final Transport transport;
-    private final io.atomix.catalyst.serializer.Serializer serializer;
-    private final Executor sharedExecutor;
-    private AtomixClient client;
-    private ResourceClient resourceClient;
+    private final MemberId localMemberId;
+    private final RaftClientProtocol protocol;
+    private RaftClient client;
     private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
     private final com.google.common.base.Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
             Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
                                                           Serializer.using(KryoNamespaces.BASIC)));
-    Function<State, Status> mapper = state -> {
-                                        switch (state) {
-                                        case CONNECTED:
-                                            return Status.ACTIVE;
-                                        case SUSPENDED:
-                                            return Status.SUSPENDED;
-                                        case CLOSED:
-                                            return Status.INACTIVE;
-                                        default:
-                                            throw new IllegalStateException("Unknown state " + state);
-                                        }
-                                    };
 
-    public StoragePartitionClient(StoragePartition partition,
-            io.atomix.catalyst.serializer.Serializer serializer,
-            Transport transport,
-            Executor sharedExecutor) {
+    public StoragePartitionClient(StoragePartition partition, MemberId localMemberId, RaftClientProtocol protocol) {
         this.partition = partition;
-        this.serializer = serializer;
-        this.transport = transport;
-        this.sharedExecutor = sharedExecutor;
+        this.localMemberId = localMemberId;
+        this.protocol = protocol;
     }
 
     @Override
     public CompletableFuture<Void> open() {
         synchronized (StoragePartitionClient.this) {
-            resourceClient = newResourceClient(transport,
-                                             serializer.clone(),
-                                             StoragePartition.RESOURCE_TYPES);
-            resourceClient.client().onStateChange(state -> log.debug("Partition {} client state"
-                    + " changed to {}", partition.getId(), state));
-            client = new AtomixClient(resourceClient);
+            client = newRaftClient(protocol);
         }
-        return client.connect(partition.getMemberAddresses()).whenComplete((r, e) -> {
+        return client.connect(partition.getMemberIds()).whenComplete((r, e) -> {
             if (e == null) {
                 log.info("Successfully started client for partition {}", partition.getId());
             } else {
                 log.info("Failed to start client for partition {}", partition.getId(), e);
             }
         }).thenApply(v -> null);
+
     }
 
     @Override
@@ -132,25 +99,19 @@
         return client != null ? client.close() : CompletableFuture.completedFuture(null);
     }
 
-    /**
-     * Returns the executor provided by the given supplier or a serial executor if the supplier is {@code null}.
-     *
-     * @param executorSupplier the user-provided executor supplier
-     * @return the executor
-     */
-    private Executor defaultExecutor(Supplier<Executor> executorSupplier) {
-        return executorSupplier != null ? executorSupplier.get() : new OrderedExecutor(sharedExecutor);
-    }
-
     @Override
-    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
-        Consumer<State> statusListener = state -> {
-            atomixConsistentMap.statusChangeListeners()
-                               .forEach(listener -> listener.accept(mapper.apply(state)));
-        };
-        resourceClient.client().onStateChange(statusListener);
+    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+        AtomixConsistentMap atomixConsistentMap =
+                new AtomixConsistentMap(client.newProxyBuilder()
+                        .withName(name)
+                        .withServiceType(DistributedPrimitive.Type.CONSISTENT_MAP.name())
+                        .withReadConsistency(ReadConsistency.SEQUENTIAL)
+                        .withCommunicationStrategy(CommunicationStrategy.ANY)
+                        .withTimeout(Duration.ofSeconds(30))
+                        .withMaxRetries(5)
+                        .build()
+                        .open()
+                        .join());
 
         AsyncConsistentMap<String, byte[]> rawMap =
                 new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
@@ -162,24 +123,27 @@
 
         // We have to ensure serialization is done on the Copycat threads since Kryo is not thread safe.
         AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.newTranscodingMap(rawMap,
-            key -> HexString.toHexString(serializer.encode(key)),
-            string -> serializer.decode(HexString.fromHexString(string)),
-            value -> value == null ? null : serializer.encode(value),
-            bytes -> serializer.decode(bytes));
+                key -> HexString.toHexString(serializer.encode(key)),
+                string -> serializer.decode(HexString.fromHexString(string)),
+                value -> value == null ? null : serializer.encode(value),
+                bytes -> serializer.decode(bytes));
 
-        return new ExecutingAsyncConsistentMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
+        return transcodedMap;
     }
 
     @Override
-    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
         AtomixConsistentTreeMap atomixConsistentTreeMap =
-                client.getResource(name, AtomixConsistentTreeMap.class).join();
-        Consumer<State> statusListener = state -> {
-            atomixConsistentTreeMap.statusChangeListeners()
-                    .forEach(listener -> listener.accept(mapper.apply(state)));
-        };
-        resourceClient.client().onStateChange(statusListener);
+                new AtomixConsistentTreeMap(client.newProxyBuilder()
+                        .withName(name)
+                        .withServiceType(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name())
+                        .withReadConsistency(ReadConsistency.SEQUENTIAL)
+                        .withCommunicationStrategy(CommunicationStrategy.ANY)
+                        .withTimeout(Duration.ofSeconds(30))
+                        .withMaxRetries(5)
+                        .build()
+                        .open()
+                        .join());
 
         AsyncConsistentTreeMap<byte[]> rawMap =
                 new DelegatingAsyncConsistentTreeMap<byte[]>(atomixConsistentTreeMap) {
@@ -191,24 +155,26 @@
 
         AsyncConsistentTreeMap<V> transcodedMap =
                 DistributedPrimitives.<V, byte[]>newTranscodingTreeMap(
-                    rawMap,
-                    value -> value == null ? null : serializer.encode(value),
-                    bytes -> serializer.decode(bytes));
+                        rawMap,
+                        value -> value == null ? null : serializer.encode(value),
+                        bytes -> serializer.decode(bytes));
 
-        return new ExecutingAsyncConsistentTreeMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
+        return transcodedMap;
     }
 
     @Override
-    public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+    public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
         AtomixConsistentSetMultimap atomixConsistentSetMultimap =
-                client.getResource(name, AtomixConsistentSetMultimap.class)
-                        .join();
-        Consumer<State> statusListener = state -> {
-            atomixConsistentSetMultimap.statusChangeListeners()
-                    .forEach(listener -> listener.accept(mapper.apply(state)));
-        };
-        resourceClient.client().onStateChange(statusListener);
+                new AtomixConsistentSetMultimap(client.newProxyBuilder()
+                        .withName(name)
+                        .withServiceType(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name())
+                        .withReadConsistency(ReadConsistency.SEQUENTIAL)
+                        .withCommunicationStrategy(CommunicationStrategy.ANY)
+                        .withTimeout(Duration.ofSeconds(30))
+                        .withMaxRetries(5)
+                        .build()
+                        .open()
+                        .join());
 
         AsyncConsistentMultimap<String, byte[]> rawMap =
                 new DelegatingAsyncConsistentMultimap<String, byte[]>(
@@ -227,97 +193,136 @@
                         value -> serializer.encode(value),
                         bytes -> serializer.decode(bytes));
 
-        return new ExecutingAsyncConsistentMultimap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
+        return transcodedMap;
     }
 
     @Override
-    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer, executorSupplier));
+    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
+        return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
     }
 
     @Override
-    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        AtomixAtomicCounterMap atomixAtomicCounterMap =
-                client.getResource(name, AtomixAtomicCounterMap.class)
-                        .join();
+    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
+        AtomixAtomicCounterMap atomixAtomicCounterMap = new AtomixAtomicCounterMap(client.newProxyBuilder()
+                .withName(name)
+                .withServiceType(DistributedPrimitive.Type.COUNTER_MAP.name())
+                .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
+                .withCommunicationStrategy(CommunicationStrategy.LEADER)
+                .withTimeout(Duration.ofSeconds(30))
+                .withMaxRetries(5)
+                .build()
+                .open()
+                .join());
 
         AsyncAtomicCounterMap<K> transcodedMap =
-                DistributedPrimitives.<K, String>newTranscodingAtomicCounterMap(
-                       atomixAtomicCounterMap,
+                DistributedPrimitives.newTranscodingAtomicCounterMap(
+                        atomixAtomicCounterMap,
                         key -> HexString.toHexString(serializer.encode(key)),
                         string -> serializer.decode(HexString.fromHexString(string)));
 
-        return new ExecutingAsyncAtomicCounterMap<>(transcodedMap, defaultExecutor(executorSupplier), sharedExecutor);
+        return transcodedMap;
     }
 
     @Override
-    public AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier) {
-        DistributedLong distributedLong = client.getLong(name).join();
-        AsyncAtomicCounter asyncCounter = new AtomixCounter(name, distributedLong);
-        return new ExecutingAsyncAtomicCounter(asyncCounter, defaultExecutor(executorSupplier), sharedExecutor);
+    public AsyncAtomicCounter newAsyncCounter(String name) {
+        return new AtomixCounter(client.newProxyBuilder()
+                .withName(name)
+                .withServiceType(DistributedPrimitive.Type.COUNTER.name())
+                .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
+                .withCommunicationStrategy(CommunicationStrategy.LEADER)
+                .withTimeout(Duration.ofSeconds(30))
+                .withMaxRetries(5)
+                .build()
+                .open()
+                .join());
     }
 
     @Override
-    public AsyncAtomicIdGenerator newAsyncIdGenerator(String name, Supplier<Executor> executorSupplier) {
-        DistributedLong distributedLong = client.getLong(name).join();
-        AsyncAtomicIdGenerator asyncIdGenerator = new AtomixIdGenerator(name, distributedLong);
-        return new ExecutingAsyncAtomicIdGenerator(asyncIdGenerator, defaultExecutor(executorSupplier), sharedExecutor);
+    public AsyncAtomicIdGenerator newAsyncIdGenerator(String name) {
+        return new AtomixIdGenerator(newAsyncCounter(name));
     }
 
     @Override
-    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-       AsyncAtomicValue<V> asyncValue = new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
-       return new ExecutingAsyncAtomicValue<>(asyncValue, defaultExecutor(executorSupplier), sharedExecutor);
+    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
+        return new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
     }
 
     @Override
-    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        AtomixWorkQueue atomixWorkQueue = client.getResource(name, AtomixWorkQueue.class).join();
-        WorkQueue<E> workQueue = new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer);
-        return new ExecutingWorkQueue<>(workQueue, defaultExecutor(executorSupplier), sharedExecutor);
+    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+        AtomixWorkQueue atomixWorkQueue = new AtomixWorkQueue(client.newProxyBuilder()
+                .withName(name)
+                .withServiceType(DistributedPrimitive.Type.WORK_QUEUE.name())
+                .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
+                .withCommunicationStrategy(CommunicationStrategy.LEADER)
+                .withTimeout(Duration.ofSeconds(5))
+                .withMaxRetries(5)
+                .build()
+                .open()
+                .join());
+        return new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer);
     }
 
     @Override
-    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(
-            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
-        AtomixDocumentTree atomixDocumentTree = client.getResource(name, AtomixDocumentTree.class).join();
-        AsyncDocumentTree<V> asyncDocumentTree = new DefaultDistributedDocumentTree<>(
-                name, atomixDocumentTree, serializer);
-        return new ExecutingAsyncDocumentTree<>(asyncDocumentTree, defaultExecutor(executorSupplier), sharedExecutor);
+    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+        AtomixDocumentTree atomixDocumentTree = new AtomixDocumentTree(client.newProxyBuilder()
+                .withName(name)
+                .withServiceType(DistributedPrimitive.Type.DOCUMENT_TREE.name())
+                .withReadConsistency(ReadConsistency.SEQUENTIAL)
+                .withCommunicationStrategy(CommunicationStrategy.ANY)
+                .withTimeout(Duration.ofSeconds(30))
+                .withMaxRetries(5)
+                .build()
+                .open()
+                .join());
+        return new DefaultDistributedDocumentTree<>(name, atomixDocumentTree, serializer);
     }
 
     @Override
-    public AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier) {
-        AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
-                                                  .thenCompose(AtomixLeaderElector::setupCache)
-                                                  .join();
-        Consumer<State> statusListener = state -> leaderElector.statusChangeListeners()
-                .forEach(listener -> listener.accept(mapper.apply(state)));
-        resourceClient.client().onStateChange(statusListener);
-        return new ExecutingAsyncLeaderElector(leaderElector, defaultExecutor(executorSupplier), sharedExecutor);
+    public AsyncLeaderElector newAsyncLeaderElector(String name) {
+        AtomixLeaderElector leaderElector = new AtomixLeaderElector(client.newProxyBuilder()
+                .withName(name)
+                .withServiceType(DistributedPrimitive.Type.LEADER_ELECTOR.name())
+                .withReadConsistency(ReadConsistency.LINEARIZABLE)
+                .withCommunicationStrategy(CommunicationStrategy.LEADER)
+                .withTimeout(Duration.ofSeconds(5))
+                .withMaxRetries(5)
+                .build()
+                .open()
+                .join());
+        leaderElector.setupCache().join();
+        return leaderElector;
     }
 
     @Override
     public Set<String> getAsyncConsistentMapNames() {
-        return client.keys(AtomixConsistentMap.class).join();
+        return client.metadata().getSessions(DistributedPrimitive.Type.CONSISTENT_MAP.name())
+                .join()
+                .stream()
+                .map(RaftSessionMetadata::serviceName)
+                .collect(Collectors.toSet());
     }
 
     @Override
     public Set<String> getAsyncAtomicCounterNames() {
-        return client.keys(DistributedLong.class).join();
+        return client.metadata().getSessions(DistributedPrimitive.Type.COUNTER.name())
+                .join()
+                .stream()
+                .map(RaftSessionMetadata::serviceName)
+                .collect(Collectors.toSet());
     }
 
     @Override
     public Set<String> getWorkQueueNames() {
-        return client.keys(AtomixWorkQueue.class).join();
+        return client.metadata().getSessions(DistributedPrimitive.Type.WORK_QUEUE.name())
+                .join()
+                .stream()
+                .map(RaftSessionMetadata::serviceName)
+                .collect(Collectors.toSet());
     }
 
     @Override
     public boolean isOpen() {
-        return resourceClient.client().state() != State.CLOSED;
+        return client != null;
     }
 
     /**
@@ -325,35 +330,14 @@
      * @return partition client information
      */
     public PartitionClientInfo clientInfo() {
-        return new PartitionClientInfo(partition.getId(),
-                partition.getMembers(),
-                resourceClient.client().session().id(),
-                mapper.apply(resourceClient.client().state()));
+        return new PartitionClientInfo(partition.getId(), partition.getMembers());
     }
 
-    private ResourceClient newResourceClient(Transport transport,
-                                           io.atomix.catalyst.serializer.Serializer serializer,
-                                           Collection<ResourceType> resourceTypes) {
-        ResourceRegistry registry = new ResourceRegistry();
-        resourceTypes.forEach(registry::register);
-        CopycatClient copycatClient = CopycatClient.builder()
-                .withServerSelectionStrategy(ServerSelectionStrategies.ANY)
-                .withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
-                .withRecoveryStrategy(RecoveryStrategies.RECOVER)
-                .withTransport(transport)
-                .withSerializer(serializer)
+    private RaftClient newRaftClient(RaftClientProtocol protocol) {
+        return RaftClient.newBuilder()
+                .withClientId("partition-" + partition.getId())
+                .withMemberId(MemberId.from(localMemberId.id()))
+                .withProtocol(protocol)
                 .build();
-        copycatClient.serializer().resolve(new ResourceManagerTypeResolver());
-        for (ResourceType type : registry.types()) {
-            try {
-                type.factory()
-                    .newInstance()
-                    .createSerializableTypeResolver()
-                    .resolve(copycatClient.serializer().registry());
-            } catch (InstantiationException | IllegalAccessException e) {
-                throw new ResourceManagerException(e);
-            }
-        }
-        return new ResourceClient(new OnosCopycatClient(copycatClient, 5, 100));
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
index 9305a31..9ae34d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
@@ -15,13 +15,12 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import io.atomix.copycat.server.cluster.Member;
-
 import java.util.Collection;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import io.atomix.protocols.raft.cluster.RaftMember;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.service.PartitionInfo;
 
@@ -34,15 +33,15 @@
 public class StoragePartitionDetails {
 
     private final PartitionId partitionId;
-    private final Set<Member> activeMembers;
-    private final Set<Member> configuredMembers;
-    private final Member leader;
+    private final Set<RaftMember> activeMembers;
+    private final Set<RaftMember> configuredMembers;
+    private final RaftMember leader;
     private final long leaderTerm;
 
     public StoragePartitionDetails(PartitionId partitionId,
-            Collection<Member> activeMembers,
-            Collection<Member> configuredMembers,
-            Member leader,
+            Collection<RaftMember> activeMembers,
+            Collection<RaftMember> configuredMembers,
+            RaftMember leader,
             long leaderTerm) {
         this.partitionId = partitionId;
         this.activeMembers = ImmutableSet.copyOf(activeMembers);
@@ -55,7 +54,7 @@
      * Returns the set of active members.
      * @return active members
      */
-    public Set<Member> activeMembers() {
+    public Set<RaftMember> activeMembers() {
         return activeMembers;
     }
 
@@ -63,7 +62,7 @@
      * Returns the set of configured members.
      * @return configured members
      */
-    public Set<Member> configuredMembers() {
+    public Set<RaftMember> configuredMembers() {
         return configuredMembers;
     }
 
@@ -71,7 +70,7 @@
      * Returns the partition leader.
      * @return leader
      */
-    public Member leader() {
+    public RaftMember leader() {
         return leader;
     }
 
@@ -98,8 +97,8 @@
      * @return partition info
      */
     public PartitionInfo toPartitionInfo() {
-        Function<Member, String> memberToString =
-                m -> m == null ? "none" : String.format("%s:%d", m.address().host(), m.address().port());
+        Function<RaftMember, String> memberToString =
+                m -> m == null ? "none" : m.memberId().toString();
         return new PartitionInfo(partitionId.toString(),
                 leaderTerm,
                 activeMembers.stream().map(memberToString).collect(Collectors.toList()),
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index f0c0609..c1ea37c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -15,24 +15,23 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import static org.slf4j.LoggerFactory.getLogger;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.copycat.server.CopycatServer;
-import io.atomix.copycat.server.storage.Storage;
-import io.atomix.copycat.server.storage.StorageLevel;
-import io.atomix.manager.internal.ResourceManagerState;
-import io.atomix.manager.util.ResourceManagerTypeResolver;
-
 import java.io.File;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
+import io.atomix.protocols.raft.RaftServer;
+import io.atomix.protocols.raft.cluster.MemberId;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.storage.StorageLevel;
+import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
 import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.store.service.Serializer;
 import org.slf4j.Logger;
 
+import static org.slf4j.LoggerFactory.getLogger;
+
 /**
  * {@link StoragePartition} server.
  */
@@ -41,36 +40,34 @@
     private final Logger log = getLogger(getClass());
 
     private static final int MAX_ENTRIES_PER_LOG_SEGMENT = 32768;
+    private final MemberId localMemberId;
     private final StoragePartition partition;
-    private final Address localAddress;
-    private final Supplier<Transport> transport;
-    private final Serializer serializer;
+    private final Supplier<RaftServerProtocol> protocol;
     private final File dataFolder;
-    private CopycatServer server;
+    private RaftServer server;
 
-    public StoragePartitionServer(Address localAddress,
+    public StoragePartitionServer(
             StoragePartition partition,
-            Serializer serializer,
-            Supplier<Transport> transport,
+            MemberId localMemberId,
+            Supplier<RaftServerProtocol> protocol,
             File dataFolder) {
         this.partition = partition;
-        this.localAddress = localAddress;
-        this.serializer = serializer;
-        this.transport = transport;
+        this.localMemberId = localMemberId;
+        this.protocol = protocol;
         this.dataFolder = dataFolder;
     }
 
     @Override
     public CompletableFuture<Void> open() {
-        CompletableFuture<CopycatServer> serverOpenFuture;
-        if (partition.getMemberAddresses().contains(localAddress)) {
+        CompletableFuture<RaftServer> serverOpenFuture;
+        if (partition.getMemberIds().contains(localMemberId)) {
             if (server != null && server.isRunning()) {
                 return CompletableFuture.completedFuture(null);
             }
             synchronized (this) {
                 server = buildServer();
             }
-            serverOpenFuture = server.bootstrap(partition.getMemberAddresses());
+            serverOpenFuture = server.bootstrap(partition.getMemberIds());
         } else {
             serverOpenFuture = CompletableFuture.completedFuture(null);
         }
@@ -96,24 +93,21 @@
         return server.leave();
     }
 
-    private CopycatServer buildServer() {
-        CopycatServer server = CopycatServer.builder(localAddress)
+    private RaftServer buildServer() {
+        RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
                 .withName("partition-" + partition.getId())
-                .withSerializer(serializer.clone())
-                .withTransport(transport.get())
-                .withStateMachine(ResourceManagerState::new)
-                .withStorage(Storage.builder()
+                .withProtocol(protocol.get())
+                .withStorage(RaftStorage.newBuilder()
                         .withStorageLevel(StorageLevel.DISK)
-                        .withCompactionThreads(1)
+                        .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
                         .withDirectory(dataFolder)
                         .withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
-                        .build())
-                .build();
-        server.serializer().resolve(new ResourceManagerTypeResolver());
-        return server;
+                        .build());
+        StoragePartition.RAFT_SERVICES.forEach(builder::addService);
+        return builder.build();
     }
 
-    public CompletableFuture<Void> join(Collection<Address> otherMembers) {
+    public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
         server = buildServer();
         return server.join(otherMembers).whenComplete((r, e) -> {
             if (e == null) {
@@ -135,9 +129,9 @@
      */
     public PartitionInfo info() {
         return new StoragePartitionDetails(partition.getId(),
-                server.cluster().members(),
-                server.cluster().members(),
-                server.cluster().leader(),
-                server.cluster().term()).toPartitionInfo();
+                server.cluster().getMembers(),
+                server.cluster().getMembers(),
+                server.cluster().getLeader(),
+                server.cluster().getTerm()).toPartitionInfo();
     }
 }