[ONOS-7054] Implement prototype of ISSU protocol

Change-Id: Id543c0de9c97b68f977c824cbc987b35d81beb2d
diff --git a/cli/src/main/java/org/onosproject/cli/IssuCommand.java b/cli/src/main/java/org/onosproject/cli/IssuCommand.java
new file mode 100644
index 0000000..1f0cb36
--- /dev/null
+++ b/cli/src/main/java/org/onosproject/cli/IssuCommand.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cli;
+
+import org.apache.karaf.shell.commands.Argument;
+import org.apache.karaf.shell.commands.Command;
+import org.onosproject.upgrade.UpgradeAdminService;
+import org.onosproject.upgrade.UpgradeService;
+
+/**
+ * Commands for managing upgrades.
+ */
+@Command(scope = "onos", name = "issu",
+        description = "Manages upgrades")
+public class IssuCommand extends AbstractShellCommand {
+
+    static final String INIT = "init";
+    static final String UPGRADE = "upgrade";
+    static final String COMMIT = "commit";
+    static final String ROLLBACK = "rollback";
+    static final String RESET = "reset";
+    static final String STATUS = "status";
+    static final String VERSION = "version";
+
+    @Argument(index = 0, name = "command",
+            description = "Command name (init|upgrade|commit|rollback|status|version)",
+            required = false, multiValued = false)
+    String command = null;
+
+    @Override
+    protected void execute() {
+        UpgradeService upgradeService = get(UpgradeService.class);
+        UpgradeAdminService upgradeAdminService = get(UpgradeAdminService.class);
+        if (command == null) {
+            print("source=%s, target=%s, status=%s, upgraded=%b, active=%b",
+                    upgradeService.getState().source(),
+                    upgradeService.getState().target(),
+                    upgradeService.getState().status(),
+                    upgradeService.isLocalUpgraded(),
+                    upgradeService.isLocalActive());
+        } else if (command.equals(INIT)) {
+            upgradeAdminService.initialize();
+            print("Initialized");
+        } else if (command.equals(UPGRADE)) {
+            upgradeAdminService.upgrade();
+            print("Upgraded");
+        } else if (command.equals(COMMIT)) {
+            upgradeAdminService.commit();
+            print("Committed version %s", upgradeService.getVersion());
+        } else if (command.equals(ROLLBACK)) {
+            upgradeAdminService.rollback();
+            print("Rolled back to version %s", upgradeService.getVersion());
+        } else if (command.equals(RESET)) {
+            upgradeAdminService.reset();
+            print("Reset version %s", upgradeService.getVersion());
+        } else if (command.equals(STATUS)) {
+            print("%s", upgradeService.getState().status());
+        } else if (command.equals(VERSION)) {
+            print("%s", upgradeService.getVersion());
+        } else {
+            print("Unsupported command: %s", command);
+        }
+    }
+
+}
diff --git a/cli/src/main/java/org/onosproject/cli/NodesListCommand.java b/cli/src/main/java/org/onosproject/cli/NodesListCommand.java
index bf53219..5a55c99 100644
--- a/cli/src/main/java/org/onosproject/cli/NodesListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/NodesListCommand.java
@@ -22,8 +22,8 @@
 import org.apache.karaf.shell.commands.Command;
 import org.joda.time.DateTime;
 import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterAdminService;
 import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.MembershipAdminService;
 import org.onosproject.core.Version;
 import org.onosproject.utils.Comparators;
 
@@ -44,7 +44,7 @@
 
     @Override
     protected void execute() {
-        ClusterAdminService service = get(ClusterAdminService.class);
+        MembershipAdminService service = get(MembershipAdminService.class);
         List<ControllerNode> nodes = newArrayList(service.getNodes());
         Collections.sort(nodes, Comparators.NODE_COMPARATOR);
         if (outputJson()) {
@@ -68,7 +68,7 @@
     }
 
     // Produces JSON structure.
-    private JsonNode json(ClusterAdminService service, List<ControllerNode> nodes) {
+    private JsonNode json(MembershipAdminService service, List<ControllerNode> nodes) {
         ObjectMapper mapper = new ObjectMapper();
         ArrayNode result = mapper.createArrayNode();
         ControllerNode self = service.getLocalNode();
diff --git a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
index 0922d39..c9df46a 100644
--- a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
@@ -66,7 +66,7 @@
             boolean first = true;
             for (String member : Ordering.natural().sortedCopy(info.members())) {
                 if (first) {
-                    print(SERVER_FMT, info.name(), info.term(), member,
+                    print(SERVER_FMT, info.id(), info.term(), member,
                             member.equals(info.leader()) ? "*" : "");
                     first = false;
                 } else {
@@ -130,7 +130,7 @@
             info.members().forEach(members::add);
 
             // Complete the partition attributes and add it to the array
-            partition.put("name", info.name())
+            partition.put("name", info.id().toString())
                     .put("term", info.term())
                     .put("leader", info.leader());
             partitions.add(partition);
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 1ffeed4..9e176b6 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -32,6 +32,11 @@
         <command>
             <action class="org.onosproject.cli.SummaryCommand"/>
         </command>
+
+        <command>
+            <action class="org.onosproject.cli.IssuCommand"/>
+        </command>
+
         <command>
             <action class="org.onosproject.cli.security.ReviewCommand"/>
             <completers>
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
index 625183a..e680877 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
@@ -15,56 +15,14 @@
  */
 package org.onosproject.cluster;
 
-import org.onlab.packet.IpAddress;
-
-import java.util.Set;
-
 /**
  * Service for administering the cluster node membership.
+ * <p>
+ * This service has a view of the cluster membership that is isolated to the local node's version during upgrades.
+ * For an equivalent service that has control over all nodes during an upgrade use
+ * {@link UnifiedClusterAdminService}.
+ *
+ * @see UnifiedClusterAdminService
  */
-public interface ClusterAdminService extends ClusterService {
-
-    /**
-     * Forms cluster configuration based on the specified set of node
-     * information.&nbsp; This method resets and restarts the controller
-     * instance.
-     *
-     * @param nodes    set of nodes that form the cluster
-     */
-    void formCluster(Set<ControllerNode> nodes);
-
-    /**
-     * Forms cluster configuration based on the specified set of node
-     * information.&nbsp; This method resets and restarts the controller
-     * instance.
-     *
-     * @param nodes    set of nodes that form the cluster
-     * @param partitionSize number of nodes to compose a partition
-     */
-    void formCluster(Set<ControllerNode> nodes, int partitionSize);
-
-    /**
-     * Adds a new controller node to the cluster.
-     *
-     * @param nodeId  controller node identifier
-     * @param ip      node IP listen address
-     * @param tcpPort tcp listen port
-     * @return newly added node
-     */
-    ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort);
-
-    /**
-     * Removes the specified node from the cluster node list.
-     *
-     * @param nodeId controller node identifier
-     */
-    void removeNode(NodeId nodeId);
-
-    /**
-     * Marks the current node as fully started or not.
-     *
-     * @param started true indicates all components have been started
-     */
-    void markFullyStarted(boolean started);
-
+public interface ClusterAdminService extends MembershipAdminService {
 }
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterService.java
index 964357f..f17e4d9 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ClusterService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterService.java
@@ -15,65 +15,13 @@
  */
 package org.onosproject.cluster;
 
-import java.util.Set;
-
-import org.joda.time.DateTime;
-import org.onosproject.core.Version;
-import org.onosproject.event.ListenerService;
-
 /**
- * Service for obtaining information about the individual nodes within
- * the controller cluster.
+ * Service for obtaining information about the individual nodes within the controller cluster.
+ * <p>
+ * This service's view of the nodes in the cluster is isolated to a single version of the software. During upgrades,
+ * when multiple versions of the software are running in the same cluster, users of this service will only be able
+ * to see nodes running the same version as the local node. This is useful for limiting communication to nodes running
+ * the same version of the software.
  */
-public interface ClusterService
-    extends ListenerService<ClusterEvent, ClusterEventListener> {
-
-    /**
-     * Returns the local controller node.
-     *
-     * @return local controller node
-     */
-    ControllerNode getLocalNode();
-
-    /**
-     * Returns the set of current cluster members.
-     *
-     * @return set of cluster members
-     */
-    Set<ControllerNode> getNodes();
-
-    /**
-     * Returns the specified controller node.
-     *
-     * @param nodeId controller node identifier
-     * @return controller node
-     */
-    ControllerNode getNode(NodeId nodeId);
-
-    /**
-     * Returns the availability state of the specified controller node. Note
-     * that this does not imply that all the core and application components
-     * have been fully activated; only that the node has joined the cluster.
-     *
-     * @param nodeId controller node identifier
-     * @return availability state
-     */
-    ControllerNode.State getState(NodeId nodeId);
-
-    /**
-     * Returns the version of the given controller node.
-     *
-     * @param nodeId controller node identifier
-     * @return controller version
-     */
-    Version getVersion(NodeId nodeId);
-
-    /**
-     * Returns the system time when the availability state was last updated.
-     *
-     * @param nodeId controller node identifier
-     * @return system time when the availability state was last updated.
-     */
-    DateTime getLastUpdated(NodeId nodeId);
-
+public interface ClusterService extends MembershipService {
 }
diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java b/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java
new file mode 100644
index 0000000..908a963
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/MembershipAdminService.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Set;
+
+import org.onlab.packet.IpAddress;
+
+/**
+ * Service for administering the cluster node membership.
+ */
+public interface MembershipAdminService extends MembershipService {
+
+    /**
+     * Forms cluster configuration based on the specified set of node
+     * information.&nbsp; This method resets and restarts the controller
+     * instance.
+     *
+     * @param nodes    set of nodes that form the cluster
+     */
+    void formCluster(Set<ControllerNode> nodes);
+
+    /**
+     * Forms cluster configuration based on the specified set of node
+     * information.&nbsp; This method resets and restarts the controller
+     * instance.
+     *
+     * @param nodes    set of nodes that form the cluster
+     * @param partitionSize number of nodes to compose a partition
+     */
+    void formCluster(Set<ControllerNode> nodes, int partitionSize);
+
+    /**
+     * Adds a new controller node to the cluster.
+     *
+     * @param nodeId  controller node identifier
+     * @param ip      node IP listen address
+     * @param tcpPort tcp listen port
+     * @return newly added node
+     */
+    ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort);
+
+    /**
+     * Removes the specified node from the cluster node list.
+     *
+     * @param nodeId controller node identifier
+     */
+    void removeNode(NodeId nodeId);
+
+    /**
+     * Marks the current node as fully started or not.
+     *
+     * @param started true indicates all components have been started
+     */
+    void markFullyStarted(boolean started);
+
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/MembershipService.java b/core/api/src/main/java/org/onosproject/cluster/MembershipService.java
new file mode 100644
index 0000000..7b366f6
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/MembershipService.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Set;
+
+import org.joda.time.DateTime;
+import org.onosproject.core.Version;
+import org.onosproject.event.ListenerService;
+
+/**
+ * Service for obtaining information about the individual nodes within
+ * the controller cluster.
+ */
+public interface MembershipService
+    extends ListenerService<ClusterEvent, ClusterEventListener> {
+
+    /**
+     * Returns the local controller node.
+     *
+     * @return local controller node
+     */
+    ControllerNode getLocalNode();
+
+    /**
+     * Returns the set of current cluster members.
+     *
+     * @return set of cluster members
+     */
+    Set<ControllerNode> getNodes();
+
+    /**
+     * Returns the specified controller node.
+     *
+     * @param nodeId controller node identifier
+     * @return controller node
+     */
+    ControllerNode getNode(NodeId nodeId);
+
+    /**
+     * Returns the availability state of the specified controller node. Note
+     * that this does not imply that all the core and application components
+     * have been fully activated; only that the node has joined the cluster.
+     *
+     * @param nodeId controller node identifier
+     * @return availability state
+     */
+    ControllerNode.State getState(NodeId nodeId);
+
+    /**
+     * Returns the version of the given controller node.
+     *
+     * @param nodeId controller node identifier
+     * @return controller version
+     */
+    Version getVersion(NodeId nodeId);
+
+    /**
+     * Returns the system time when the availability state was last updated.
+     *
+     * @param nodeId controller node identifier
+     * @return system time when the availability state was last updated.
+     */
+    DateTime getLastUpdated(NodeId nodeId);
+
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/PartitionId.java b/core/api/src/main/java/org/onosproject/cluster/PartitionId.java
index f8b65e7..39852de 100644
--- a/core/api/src/main/java/org/onosproject/cluster/PartitionId.java
+++ b/core/api/src/main/java/org/onosproject/cluster/PartitionId.java
@@ -25,6 +25,11 @@
 public class PartitionId extends Identifier<Integer> implements Comparable<PartitionId> {
 
     /**
+     * The {@code PartitionId} for the shared coordination partition.
+     */
+    public static final PartitionId SHARED = PartitionId.from(0);
+
+    /**
      * Creates a partition identifier from an integer.
      *
      * @param id input integer
diff --git a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java
new file mode 100644
index 0000000..9fb7407
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterAdminService.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Cluster membership administration service that supports modification of all nodes during an upgrade.
+ */
+@Beta
+public interface UnifiedClusterAdminService extends MembershipAdminService {
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java
new file mode 100644
index 0000000..f690f3e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/UnifiedClusterService.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Unified multi-version cluster membership service.
+ * <p>
+ * During upgrades, the nodes within a cluster may be running multiple versions of the software.
+ * This service has a view of the entire cluster running any version. Users of this service must be careful when
+ * communicating with nodes described by this service as compatibility issues can result from communicating across
+ * versions. For an equivalent service that has an isolated view of the cluster, see {@link ClusterService}.
+ *
+ * @see ClusterService
+ */
+@Beta
+public interface UnifiedClusterService extends MembershipService {
+}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index 4a46885..dfc558d 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -15,152 +15,15 @@
  */
 package org.onosproject.store.cluster.messaging;
 
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-import org.onosproject.cluster.NodeId;
-
 /**
  * Service for assisting communications between controller cluster nodes.
+ * <p>
+ * Communication via this service is isolated to nodes running a single version of the software. During upgrades, when
+ * nodes may be running multiple versions simultaneously, this service prevents nodes running different versions of
+ * the software from communicating with each other, thus avoiding compatibility issues. For an equivalent cross-version
+ * compatible service, see {@link UnifiedClusterCommunicationService}.
+ *
+ * @see UnifiedClusterCommunicationService
  */
-public interface ClusterCommunicationService {
-
-    /**
-     * Adds a new subscriber for the specified message subject.
-     *
-     * @param subject    message subject
-     * @param subscriber message subscriber
-     * @param executor executor to use for running handler.
-     * @deprecated in Cardinal Release
-     */
-    @Deprecated
-    void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
-
-    /**
-     * Broadcasts a message to all controller nodes.
-     *
-     * @param message message to send
-     * @param subject message subject
-     * @param encoder function for encoding message to byte[]
-     * @param <M> message type
-     */
-    <M> void broadcast(M message,
-                       MessageSubject subject,
-                       Function<M, byte[]> encoder);
-
-    /**
-     * Broadcasts a message to all controller nodes including self.
-     *
-     * @param message message to send
-     * @param subject message subject
-     * @param encoder function for encoding message to byte[]
-     * @param <M> message type
-     */
-    <M> void broadcastIncludeSelf(M message,
-                                  MessageSubject subject,
-                                  Function<M, byte[]> encoder);
-
-    /**
-     * Sends a message to the specified controller node.
-     *
-     * @param message message to send
-     * @param subject message subject
-     * @param encoder function for encoding message to byte[]
-     * @param toNodeId destination node identifier
-     * @param <M> message type
-     * @return future that is completed when the message is sent
-     */
-    <M> CompletableFuture<Void> unicast(M message,
-                        MessageSubject subject,
-                        Function<M, byte[]> encoder,
-                        NodeId toNodeId);
-
-    /**
-     * Multicasts a message to a set of controller nodes.
-     *
-     * @param message message to send
-     * @param subject message subject
-     * @param encoder function for encoding message to byte[]
-     * @param nodeIds  recipient node identifiers
-     * @param <M> message type
-     */
-    <M> void multicast(M message,
-                       MessageSubject subject,
-                       Function<M, byte[]> encoder,
-                       Set<NodeId> nodeIds);
-
-    /**
-     * Sends a message and expects a reply.
-     *
-     * @param message message to send
-     * @param subject message subject
-     * @param encoder function for encoding request to byte[]
-     * @param decoder function for decoding response from byte[]
-     * @param toNodeId recipient node identifier
-     * @param <M> request type
-     * @param <R> reply type
-     * @return reply future
-     */
-    <M, R> CompletableFuture<R> sendAndReceive(M message,
-                                               MessageSubject subject,
-                                               Function<M, byte[]> encoder,
-                                               Function<byte[], R> decoder,
-                                               NodeId toNodeId);
-
-    /**
-     * Adds a new subscriber for the specified message subject.
-     *
-     * @param subject message subject
-     * @param decoder decoder for resurrecting incoming message
-     * @param handler handler function that processes the incoming message and produces a reply
-     * @param encoder encoder for serializing reply
-     * @param executor executor to run this handler on
-     * @param <M> incoming message type
-     * @param <R> reply message type
-     */
-    <M, R> void addSubscriber(MessageSubject subject,
-                              Function<byte[], M> decoder,
-                              Function<M, R> handler,
-                              Function<R, byte[]> encoder,
-                              Executor executor);
-
-    /**
-     * Adds a new subscriber for the specified message subject.
-     *
-     * @param subject message subject
-     * @param decoder decoder for resurrecting incoming message
-     * @param handler handler function that processes the incoming message and produces a reply
-     * @param encoder encoder for serializing reply
-     * @param <M> incoming message type
-     * @param <R> reply message type
-     */
-    <M, R> void addSubscriber(MessageSubject subject,
-                              Function<byte[], M> decoder,
-                              Function<M, CompletableFuture<R>> handler,
-                              Function<R, byte[]> encoder);
-
-    /**
-     * Adds a new subscriber for the specified message subject.
-     *
-     * @param subject message subject
-     * @param decoder decoder to resurrecting incoming message
-     * @param handler handler for handling message
-     * @param executor executor to run this handler on
-     * @param <M> incoming message type
-     */
-    <M> void addSubscriber(MessageSubject subject,
-                           Function<byte[], M> decoder,
-                           Consumer<M> handler,
-                           Executor executor);
-
-    /**
-     * Removes a subscriber for the specified message subject.
-     *
-     * @param subject message subject
-     */
-    void removeSubscriber(MessageSubject subject);
+public interface ClusterCommunicationService extends ClusterCommunicator {
 }
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java
new file mode 100644
index 0000000..7d2480e
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicator.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.onosproject.cluster.NodeId;
+
+/**
+ * Service for assisting communications between controller cluster nodes.
+ */
+public interface ClusterCommunicator {
+
+    /**
+     * Adds a new subscriber for the specified message subject.
+     *
+     * @param subject    message subject
+     * @param subscriber message subscriber
+     * @param executor executor to use for running handler.
+     * @deprecated in Cardinal Release
+     */
+    @Deprecated
+    void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
+
+    /**
+     * Broadcasts a message to all controller nodes.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding message to byte[]
+     * @param <M> message type
+     */
+    <M> void broadcast(M message,
+            MessageSubject subject,
+            Function<M, byte[]> encoder);
+
+    /**
+     * Broadcasts a message to all controller nodes including self.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding message to byte[]
+     * @param <M> message type
+     */
+    <M> void broadcastIncludeSelf(M message,
+            MessageSubject subject,
+            Function<M, byte[]> encoder);
+
+    /**
+     * Sends a message to the specified controller node.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding message to byte[]
+     * @param toNodeId destination node identifier
+     * @param <M> message type
+     * @return future that is completed when the message is sent
+     */
+    <M> CompletableFuture<Void> unicast(M message,
+            MessageSubject subject,
+            Function<M, byte[]> encoder,
+            NodeId toNodeId);
+
+    /**
+     * Multicasts a message to a set of controller nodes.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding message to byte[]
+     * @param nodeIds  recipient node identifiers
+     * @param <M> message type
+     */
+    <M> void multicast(M message,
+            MessageSubject subject,
+            Function<M, byte[]> encoder,
+            Set<NodeId> nodeIds);
+
+    /**
+     * Sends a message and expects a reply.
+     *
+     * @param message message to send
+     * @param subject message subject
+     * @param encoder function for encoding request to byte[]
+     * @param decoder function for decoding response from byte[]
+     * @param toNodeId recipient node identifier
+     * @param <M> request type
+     * @param <R> reply type
+     * @return reply future
+     */
+    <M, R> CompletableFuture<R> sendAndReceive(M message,
+            MessageSubject subject,
+            Function<M, byte[]> encoder,
+            Function<byte[], R> decoder,
+            NodeId toNodeId);
+
+    /**
+     * Adds a new subscriber for the specified message subject.
+     *
+     * @param subject message subject
+     * @param decoder decoder for resurrecting incoming message
+     * @param handler handler function that processes the incoming message and produces a reply
+     * @param encoder encoder for serializing reply
+     * @param executor executor to run this handler on
+     * @param <M> incoming message type
+     * @param <R> reply message type
+     */
+    <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, R> handler,
+            Function<R, byte[]> encoder,
+            Executor executor);
+
+    /**
+     * Adds a new subscriber for the specified message subject.
+     *
+     * @param subject message subject
+     * @param decoder decoder for resurrecting incoming message
+     * @param handler handler function that processes the incoming message and produces a reply
+     * @param encoder encoder for serializing reply
+     * @param <M> incoming message type
+     * @param <R> reply message type
+     */
+    <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, CompletableFuture<R>> handler,
+            Function<R, byte[]> encoder);
+
+    /**
+     * Adds a new subscriber for the specified message subject.
+     *
+     * @param subject message subject
+     * @param decoder decoder to resurrecting incoming message
+     * @param handler handler for handling message
+     * @param executor executor to run this handler on
+     * @param <M> incoming message type
+     */
+    <M> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Consumer<M> handler,
+            Executor executor);
+
+    /**
+     * Removes a subscriber for the specified message subject.
+     *
+     * @param subject message subject
+     */
+    void removeSubscriber(MessageSubject subject);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java
new file mode 100644
index 0000000..042c803
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/UnifiedClusterCommunicationService.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Service for unified communication across controller nodes running multiple software versions.
+ * <p>
+ * This service supports communicating across nodes running different versions of the software simultaneously. During
+ * upgrades, when nodes may be running a mixture of versions, this service can be used to coordinate across those
+ * versions. But users of this service must be extremely careful to preserve backward/forward compatibility for
+ * messages sent across versions. Encoders and decoders used for messages sent/received on this service should
+ * support evolving schemas.
+ */
+@Beta
+public interface UnifiedClusterCommunicationService extends ClusterCommunicator {
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
index a9547c9..adca843 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
@@ -22,6 +22,11 @@
  */
 public interface AtomicValue<V> extends DistributedPrimitive  {
 
+    @Override
+    default Type primitiveType() {
+        return Type.VALUE;
+    }
+
     /**
      * Atomically sets the value to the given updated value if the current value is equal to the expected value.
      * <p>
diff --git a/core/api/src/main/java/org/onosproject/store/service/CoordinationService.java b/core/api/src/main/java/org/onosproject/store/service/CoordinationService.java
new file mode 100644
index 0000000..8725a06
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/CoordinationService.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Cross-version storage/coordination service.
+ * <p>
+ * This is a special type of {@link PrimitiveService} that differs semantically from {@link StorageService} in that
+ * it supports cross-version backward/forward compatible storage. During upgrades, when nodes are running different
+ * versions of the software, this service guarantees that cross-version compatibility will be maintained and provides
+ * shared compatible primitives for coordinating across versions. Users must ensure that all objects stored in
+ * primitives created via this service are stored using a serialization format that is backward/forward compatible,
+ * e.g. using {@link org.onlab.util.KryoNamespace.Builder#setCompatible(boolean)}.
+ *
+ * @see org.onlab.util.KryoNamespace.Builder#setCompatible(boolean)
+ * @see StorageService
+ */
+@Beta
+public interface CoordinationService extends PrimitiveService {
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/PartitionInfo.java b/core/api/src/main/java/org/onosproject/store/service/PartitionInfo.java
index fb903b8..c3c3835 100644
--- a/core/api/src/main/java/org/onosproject/store/service/PartitionInfo.java
+++ b/core/api/src/main/java/org/onosproject/store/service/PartitionInfo.java
@@ -15,15 +15,16 @@
  */
 package org.onosproject.store.service;
 
-import com.google.common.collect.ImmutableList;
-
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+import org.onosproject.cluster.PartitionId;
+
 /**
  * Contains information about a database partition.
  */
 public class PartitionInfo {
-    private final String name;
+    private final PartitionId partitionId;
     private final long term;
     private final List<String> members;
     private final String leader;
@@ -31,25 +32,25 @@
     /**
      * Class constructor.
      *
-     * @param name partition name
+     * @param partitionId partition identifier
      * @param term term number
      * @param members partition members
      * @param leader leader name
      */
-    public PartitionInfo(String name, long term, List<String> members, String leader) {
-        this.name = name;
+    public PartitionInfo(PartitionId partitionId, long term, List<String> members, String leader) {
+        this.partitionId = partitionId;
         this.term = term;
         this.members = ImmutableList.copyOf(members);
         this.leader = leader;
     }
 
     /**
-     * Returns the name of the partition.
+     * Returns the partition ID.
      *
-     * @return partition name
+     * @return partition ID
      */
-    public String name() {
-        return name;
+    public PartitionId id() {
+        return partitionId;
     }
 
     /**
diff --git a/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
new file mode 100644
index 0000000..7be0480
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/PrimitiveService.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Primitive service.
+ * <p>
+ * This service provides builders for various distributed primitives.
+ * <p>
+ * It is expected that services and applications will leverage the primitives indirectly provided by
+ * this service for their distributed state management and coordination.
+ */
+public interface PrimitiveService {
+
+    /**
+     * Creates a new EventuallyConsistentMapBuilder.
+     *
+     * @param <K> key type
+     * @param <V> value type
+     * @return builder for an eventually consistent map
+     */
+    <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
+
+    /**
+     * Creates a new ConsistentMapBuilder.
+     *
+     * @param <K> key type
+     * @param <V> value type
+     * @return builder for a consistent map
+     */
+    <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
+
+    /**
+     * Creates a new ConsistentMapBuilder.
+     *
+     * @param <V> value type
+     * @return builder for a document tree
+     */
+    <V> DocumentTreeBuilder<V> documentTreeBuilder();
+
+    /**
+     * Creates a new {@code AsyncConsistentTreeMapBuilder}.
+     *
+     * @param <V> value type
+     * @return builder for a consistent tree map
+     */
+    <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder();
+
+    /**
+     * Creates a new {@code AsyncConsistentSetMultimapBuilder}.
+     *
+     * @param <K> key type
+     * @param <V> value type
+     * @return builder for a set based consistent multimap
+     */
+    <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder();
+
+    /**
+     * Creates a new {@code AtomicCounterMapBuilder}.
+     *
+     * @param <K> key type
+     * @return builder for an atomic counter map
+     */
+    <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder();
+
+    /**
+     * Creates a new DistributedSetBuilder.
+     *
+     * @param <E> set element type
+     * @return builder for a distributed set
+     */
+    <E> DistributedSetBuilder<E> setBuilder();
+
+    /**
+     * Creates a new AtomicCounterBuilder.
+     *
+     * @return atomic counter builder
+     */
+    AtomicCounterBuilder atomicCounterBuilder();
+
+    /**
+     * Creates a new AtomicIdGeneratorBuilder.
+     *
+     * @return atomic ID generator builder
+     */
+    AtomicIdGeneratorBuilder atomicIdGeneratorBuilder();
+
+    /**
+     * Creates a new AtomicValueBuilder.
+     *
+     * @param <V> atomic value type
+     * @return atomic value builder
+     */
+    <V> AtomicValueBuilder<V> atomicValueBuilder();
+
+    /**
+     * Creates a new LeaderElectorBuilder.
+     *
+     * @return leader elector builder
+     */
+    LeaderElectorBuilder leaderElectorBuilder();
+
+    /**
+     * Creates a new transaction context builder.
+     *
+     * @return a builder for a transaction context.
+     */
+    TransactionContextBuilder transactionContextBuilder();
+
+    /**
+     * Returns an instance of {@code AsyncAtomicCounter} with specified name.
+     * @param name counter name
+     *
+     * @return AsyncAtomicCounter instance
+     */
+    default AsyncAtomicCounter getAsyncAtomicCounter(String name) {
+        return atomicCounterBuilder().withName(name).build();
+    }
+
+    /**
+     * Returns an instance of {@code AsyncAtomicIdGenerator} with specified name.
+     *
+     * @param name ID generator name
+     * @return AsyncAtomicIdGenerator instance
+     */
+    default AsyncAtomicIdGenerator getAsyncAtomicIdGenerator(String name) {
+        return atomicIdGeneratorBuilder().withName(name).build();
+    }
+
+    /**
+     * Returns an instance of {@code AtomicCounter} with specified name.
+     * @param name counter name
+     *
+     * @return AtomicCounter instance
+     */
+    default AtomicCounter getAtomicCounter(String name) {
+        return getAsyncAtomicCounter(name).asAtomicCounter();
+    }
+
+    /**
+     * Returns an instance of {@code AtomicIdGenerator} with specified name.
+     *
+     * @param name ID generator name
+     * @return AtomicIdGenerator instance
+     */
+    default AtomicIdGenerator getAtomicIdGenerator(String name) {
+        return getAsyncAtomicIdGenerator(name).asAtomicIdGenerator();
+    }
+
+    /**
+     * Returns an instance of {@code WorkQueue} with specified name.
+     *
+     * @param <E> work element type
+     * @param name work queue name
+     * @param serializer serializer
+     * @return WorkQueue instance
+     */
+    <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
+
+    /**
+     * Returns an instance of {@code AsyncDocumentTree} with specified name.
+     *
+     * @param <V> tree node value type
+     * @param name document tree name
+     * @param serializer serializer
+     * @return AsyncDocumentTree instance
+     */
+    <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer);
+
+     /** Returns a set backed instance of {@code AsyncConsistentMultimap} with
+     * the specified name.
+     *
+     * @param name the multimap name
+     * @param serializer serializer
+     * @param <K> key type
+     * @param <V> value type
+     * @return set backed {@code AsyncConsistentMultimap} instance
+     */
+    <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name,
+            Serializer serializer);
+
+    /**
+     * Returns an instance of {@code AsyncConsistentTreeMap} with the specified
+     * name.
+     *
+     * @param name the treemap name
+     * @param serializer serializer
+     * @param <V> value type
+     * @return set backed {@code AsyncConsistentTreeMap} instance
+     */
+    <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name,
+            Serializer serializer);
+
+    /**
+     * Returns an instance of {@code Topic} with specified name.
+     *
+     * @param <T> topic message type
+     * @param name topic name
+     * @param serializer serializer
+     *
+     * @return Topic instance
+     */
+    <T> Topic<T> getTopic(String name, Serializer serializer);
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index d863983..94843fc 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -23,195 +23,5 @@
  * It is expected that services and applications will leverage the primitives indirectly provided by
  * this service for their distributed state management and coordination.
  */
-public interface StorageService {
-
-    /**
-     * Creates a new EventuallyConsistentMapBuilder.
-     *
-     * @param <K> key type
-     * @param <V> value type
-     * @return builder for an eventually consistent map
-     */
-    <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
-
-    /**
-     * Creates a new ConsistentMapBuilder.
-     *
-     * @param <K> key type
-     * @param <V> value type
-     * @return builder for a consistent map
-     */
-    <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
-
-    /**
-     * Creates a new ConsistentMapBuilder.
-     *
-     * @param <V> value type
-     * @return builder for a consistent map
-     */
-    <V> DocumentTreeBuilder<V> documentTreeBuilder();
-
-    /**
-     * Creates a new {@code AsyncConsistentTreeMapBuilder}.
-     *
-     * @param <V> value type
-     * @return builder for a async consistent tree map
-     */
-    <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder();
-
-    /**
-     * Creates a new {@code AsyncConsistentSetMultimapBuilder}.
-     *
-     * @param <K> key type
-     * @param <V> value type
-     * @return builder for a set based async consistent multimap
-     */
-    <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder();
-
-    /**
-     * Creates a new {@code AtomicCounterMapBuilder}.
-     *
-     * @param <K> key type
-     * @return builder for an atomic counter map
-     */
-    <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder();
-
-    /**
-     * Creates a new DistributedSetBuilder.
-     *
-     * @param <E> set element type
-     * @return builder for an distributed set
-     */
-    <E> DistributedSetBuilder<E> setBuilder();
-
-    /**
-     * Creates a new AtomicCounterBuilder.
-     *
-     * @return atomic counter builder
-     */
-    AtomicCounterBuilder atomicCounterBuilder();
-
-    /**
-     * Creates a new AtomicIdGeneratorBuilder.
-     *
-     * @return atomic ID generator builder
-     */
-    AtomicIdGeneratorBuilder atomicIdGeneratorBuilder();
-
-    /**
-     * Creates a new AtomicValueBuilder.
-     *
-     * @param <V> atomic value type
-     * @return atomic value builder
-     */
-    <V> AtomicValueBuilder<V> atomicValueBuilder();
-
-    /**
-     * Creates a new LeaderElectorBuilder.
-     *
-     * @return leader elector builder
-     */
-    LeaderElectorBuilder leaderElectorBuilder();
-
-    /**
-     * Creates a new transaction context builder.
-     *
-     * @return a builder for a transaction context.
-     */
-    TransactionContextBuilder transactionContextBuilder();
-
-    /**
-     * Returns an instance of {@code AsyncAtomicCounter} with specified name.
-     * @param name counter name
-     *
-     * @return AsyncAtomicCounter instance
-     */
-    default AsyncAtomicCounter getAsyncAtomicCounter(String name) {
-        return atomicCounterBuilder().withName(name).build();
-    }
-
-    /**
-     * Returns an instance of {@code AsyncAtomicIdGenerator} with specified name.
-     *
-     * @param name ID generator name
-     * @return AsyncAtomicIdGenerator instance
-     */
-    default AsyncAtomicIdGenerator getAsyncAtomicIdGenerator(String name) {
-        return atomicIdGeneratorBuilder().withName(name).build();
-    }
-
-    /**
-     * Returns an instance of {@code AtomicCounter} with specified name.
-     * @param name counter name
-     *
-     * @return AtomicCounter instance
-     */
-    default AtomicCounter getAtomicCounter(String name) {
-        return getAsyncAtomicCounter(name).asAtomicCounter();
-    }
-
-    /**
-     * Returns an instance of {@code AtomicIdGenerator} with specified name.
-     *
-     * @param name ID generator name
-     * @return AtomicIdGenerator instance
-     */
-    default AtomicIdGenerator getAtomicIdGenerator(String name) {
-        return getAsyncAtomicIdGenerator(name).asAtomicIdGenerator();
-    }
-
-    /**
-     * Returns an instance of {@code WorkQueue} with specified name.
-     *
-     * @param <E> work element type
-     * @param name work queue name
-     * @param serializer serializer
-     * @return WorkQueue instance
-     */
-    <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
-
-    /**
-     * Returns an instance of {@code AsyncDocumentTree} with specified name.
-     *
-     * @param <V> tree node value type
-     * @param name document tree name
-     * @param serializer serializer
-     * @return AsyncDocumentTree instance
-     */
-    <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer);
-
-     /** Returns a set backed instance of {@code AsyncConsistentMultimap} with
-     * the specified name.
-     *
-     * @param name the multimap name
-     * @param serializer serializer
-     * @param <K> key type
-     * @param <V> value type
-     * @return set backed {@code AsyncConsistentMultimap} instance
-     */
-    <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name,
-                                                             Serializer serializer);
-
-    /**
-     * Returns an instance of {@code AsyncConsistentTreeMap} with the specified
-     * name.
-     *
-     * @param name the treemap name
-     * @param serializer serializer
-     * @param <V> value type
-     * @return set backed {@code AsyncConsistentTreeMap} instance
-     */
-    <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name,
-                                                  Serializer serializer);
-
-    /**
-     * Returns an instance of {@code Topic} with specified name.
-     *
-     * @param <T> topic message type
-     * @param name topic name
-     * @param serializer serializer
-     *
-     * @return Topic instance
-     */
-    <T> Topic<T> getTopic(String name, Serializer serializer);
+public interface StorageService extends PrimitiveService {
 }
diff --git a/core/api/src/main/java/org/onosproject/upgrade/Upgrade.java b/core/api/src/main/java/org/onosproject/upgrade/Upgrade.java
new file mode 100644
index 0000000..79fa7ba
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/Upgrade.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.core.Version;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Represents the state of an upgrade.
+ * <p>
+ * An upgrade consists of a {@link #source() source} and {@link #target() target} version and an upgrade
+ * {@link #status()}.
+ */
+@Beta
+public class Upgrade {
+
+    /**
+     * Represents the phase of the upgrade protocol.
+     */
+    @Beta
+    public enum Status {
+
+        /**
+         * Represents state in which no upgrade has been initialized.
+         */
+        INACTIVE(false, false),
+
+        /**
+         * Indicates that an upgrade is being initialized.
+         */
+        INITIALIZING(true, false),
+
+        /**
+         * Indicates that an upgrade has been initialized.
+         */
+        INITIALIZED(true, false),
+
+        /**
+         * Indicates that an upgrade is in progress.
+         */
+        UPGRADING(true, true),
+
+        /**
+         * Indicates that an upgrade is complete.
+         */
+        UPGRADED(true, true),
+
+        /**d
+         * Indicates that an upgrade is being committed.
+         */
+        COMMITTING(true, true),
+
+        /**
+         * Indicates that an upgrade has been committed.
+         */
+        COMMITTED(false, true),
+
+        /**
+         * Indicates that an upgrade is being rolled back.
+         */
+        ROLLING_BACK(true, false),
+
+        /**
+         * Indicates that an upgrade has been rolled back.
+         */
+        ROLLED_BACK(true, false),
+
+        /**
+         * Indicates that an upgrade is being reset.
+         */
+        RESETTING(true, false),
+
+        /**
+         * Indicates that an upgrade has been reset.
+         */
+        RESET(false, false);
+
+        private final boolean active;
+        private final boolean upgraded;
+
+        Status(boolean active, boolean upgraded) {
+            this.active = active;
+            this.upgraded = upgraded;
+        }
+
+        /**
+         * Returns whether the upgrade status is active.
+         *
+         * @return whether the upgrade status is active
+         */
+        public boolean active() {
+            return active;
+        }
+
+        /**
+         * Returns whether the upgraded version is active.
+         *
+         * @return whether the upgraded version is active
+         */
+        public boolean upgraded() {
+            return upgraded;
+        }
+    }
+
+    private final Version source;
+    private final Version target;
+    private final Status status;
+
+    public Upgrade(Version source, Version target, Status status) {
+        this.source = source;
+        this.target = target;
+        this.status = status;
+    }
+
+    /**
+     * Returns the source version.
+     *
+     * @return the source version
+     */
+    public Version source() {
+        return source;
+    }
+
+    /**
+     * Returns the target version.
+     *
+     * @return the target version
+     */
+    public Version target() {
+        return target;
+    }
+
+    /**
+     * Returns the upgrade status.
+     *
+     * @return the upgrade status
+     */
+    public Status status() {
+        return status;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("source", source)
+                .add("target", target)
+                .add("status", status)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/UpgradeAdminService.java b/core/api/src/main/java/org/onosproject/upgrade/UpgradeAdminService.java
new file mode 100644
index 0000000..38c1bb1
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/UpgradeAdminService.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Abstraction for executing the stages of the upgrade process.
+ * <p>
+ * Upgrades are performed in three phases:
+ * <ul>
+ *     <li>{@code initialize} - Initializes an upgrade</li>
+ *     <li>{@code upgrade} - Performs an upgrade, transferring device mastership from the current version to the
+ *     upgraded version</li>
+ *     <li>{@code commit} or {@code rollback} - Completes or rolls back an upgrade, transferring mastership back
+ *     to nodes running the previous version</li>
+ * </ul>
+ */
+@Beta
+public interface UpgradeAdminService {
+
+    /**
+     * Initializes an upgrade.
+     * <p>
+     * This method must be called to initialize an upgrade and prior to physically upgrading any nodes.
+     *
+     * @throws IllegalStateException if an upgrade is already in progress
+     */
+    void initialize();
+
+    /**
+     * Performs an upgrade, transferring device mastership to upgraded nodes.
+     * <p>
+     * This method transfers mastership from the current version of the software to the upgraded version. Thus,
+     * a subset of the nodes in the cluster must have been physically upgraded and restarted prior to executing this
+     * phase of the upgrade protocol.
+     *
+     * @throws IllegalStateException if no upgrade has been initialized
+     */
+    void upgrade();
+
+    /**
+     * Commits an upgrade.
+     * <p>
+     * Completes the upgrade process, committing the new cluster version.
+     *
+     * @throws IllegalStateException if no upgrade is in progress or not all nodes have been upgraded
+     */
+    void commit();
+
+    /**
+     * Rolls back an upgrade.
+     * <p>
+     * When an upgrade is rolled back, mastership is transferred from upgraded nodes back to nodes running the
+     * version of the software prior to the upgrade.
+     *
+     * @throws IllegalStateException if no upgrade is in progress
+     */
+    void rollback();
+
+    /**
+     * Resets an upgrade.
+     * <p>
+     * When an upgrade is rolled back, once nodes have been restored to the previos version the upgrade must be reset
+     * to restore the upgrade state to {@link Upgrade.Status#INACTIVE}.
+     *
+     * @throws IllegalStateException if nodes have not been restored to the previous state
+     */
+    void reset();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/UpgradeEvent.java b/core/api/src/main/java/org/onosproject/upgrade/UpgradeEvent.java
new file mode 100644
index 0000000..43cf523
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/UpgradeEvent.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import java.util.Objects;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects;
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Upgrade event.
+ */
+@Beta
+public class UpgradeEvent extends AbstractEvent<UpgradeEvent.Type, Upgrade> {
+
+    /**
+     * Type of upgrade-related events.
+     */
+    @Beta
+    public enum Type {
+
+        /**
+         * Indicates that a new upgrade was initialized.
+         */
+        INITIALIZED,
+
+        /**
+         * Indicates that mastership was reassigned to the upgraded cluster.
+         */
+        UPGRADED,
+
+        /**
+         * Indicates that an upgrade was committed.
+         */
+        COMMITTED,
+
+        /**
+         * Indicates that an upgrade was rolled back.
+         */
+        ROLLED_BACK,
+
+        /**
+         * Indicates that an upgrade was reset.
+         */
+        RESET,
+    }
+
+    /**
+     * Creates an event of a given type and for the specified state and the
+     * current time.
+     *
+     * @param type  upgrade event type
+     * @param state upgrade state
+     */
+    public UpgradeEvent(UpgradeEvent.Type type, Upgrade state) {
+        super(type, state);
+    }
+
+    /**
+     * Creates an event of a given type and for the specified state and time.
+     *
+     * @param type  upgrade event type
+     * @param state upgrade state
+     * @param time  occurrence time
+     */
+    public UpgradeEvent(UpgradeEvent.Type type, Upgrade state, long time) {
+        super(type, state, time);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type(), subject(), time());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj instanceof UpgradeEvent) {
+            final UpgradeEvent other = (UpgradeEvent) obj;
+            return Objects.equals(this.type(), other.type()) &&
+                    Objects.equals(this.subject(), other.subject()) &&
+                    Objects.equals(this.time(), other.time());
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this.getClass())
+                .add("type", type())
+                .add("subject", subject())
+                .add("time", time())
+                .toString();
+    }
+
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/UpgradeEventListener.java b/core/api/src/main/java/org/onosproject/upgrade/UpgradeEventListener.java
new file mode 100644
index 0000000..876d35a
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/UpgradeEventListener.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.EventListener;
+
+/**
+ * Upgrade event listener.
+ */
+@Beta
+public interface UpgradeEventListener extends EventListener<UpgradeEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/UpgradeService.java b/core/api/src/main/java/org/onosproject/upgrade/UpgradeService.java
new file mode 100644
index 0000000..ba5889c
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/UpgradeService.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.core.Version;
+import org.onosproject.event.ListenerService;
+
+/**
+ * Upgrade service.
+ */
+@Beta
+public interface UpgradeService
+        extends ListenerService<UpgradeEvent, UpgradeEventListener> {
+
+    /**
+     * Returns whether an upgrade is in progress.
+     * <p>
+     * An upgrade is in progress if the upgrade {@link Upgrade.Status} is active, e.g.
+     * {@link Upgrade.Status#INITIALIZED}, {@link Upgrade.Status#UPGRADED}, etc.
+     *
+     * @return indicates whether an upgrade is in progress
+     */
+    boolean isUpgrading();
+
+    /**
+     * Returns the current upgrade state.
+     *
+     * @return the current upgrade state
+     */
+    Upgrade getState();
+
+    /**
+     * Returns the currently active software version.
+     * <p>
+     * The returned version is representative of the version currently in control of the network. When the upgrade
+     * transitions to the {@link Upgrade.Status#UPGRADING} state, control over the network is transferred from
+     * {@link Upgrade#source()} nodes to {@link Upgrade#target()} nodes, and the version returned by this method
+     * represents that change.
+     *
+     * @return the software version
+     */
+    Version getVersion();
+
+    /**
+     * Returns whether the local node is active.
+     * <p>
+     * The local node will be active if its {@link Version} matches the version returned by {@link #getVersion()}.
+     *
+     * @return indicates whether the local node is active according to its version
+     */
+    boolean isLocalActive();
+
+    /**
+     * Returns whether the local node is an upgraded node.
+     *
+     * @return {@code true} if the local node's version matches {@link Upgrade#target()}
+     */
+    boolean isLocalUpgraded();
+
+}
diff --git a/core/api/src/main/java/org/onosproject/upgrade/package-info.java b/core/api/src/main/java/org/onosproject/upgrade/package-info.java
new file mode 100644
index 0000000..5f19a54
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/upgrade/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Abstractions for managing software upgrades.
+ */
+package org.onosproject.upgrade;
diff --git a/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java
new file mode 100644
index 0000000..aca74ec
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/cluster/UnifiedClusterServiceAdapter.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Set;
+
+import org.joda.time.DateTime;
+import org.onosproject.core.Version;
+
+/**
+ * Compatible cluster service adapter.
+ */
+public class UnifiedClusterServiceAdapter implements UnifiedClusterService {
+    @Override
+    public ControllerNode getLocalNode() {
+        return null;
+    }
+
+    @Override
+    public Set<ControllerNode> getNodes() {
+        return null;
+    }
+
+    @Override
+    public ControllerNode getNode(NodeId nodeId) {
+        return null;
+    }
+
+    @Override
+    public ControllerNode.State getState(NodeId nodeId) {
+        return null;
+    }
+
+    @Override
+    public Version getVersion(NodeId nodeId) {
+        return null;
+    }
+
+    @Override
+    public DateTime getLastUpdated(NodeId nodeId) {
+        return null;
+    }
+
+    @Override
+    public void addListener(ClusterEventListener listener) {
+
+    }
+
+    @Override
+    public void removeListener(ClusterEventListener listener) {
+
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AsyncAtomicValueAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AsyncAtomicValueAdapter.java
new file mode 100644
index 0000000..0aefe8c
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/AsyncAtomicValueAdapter.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Async atomic value adapter.
+ */
+public class AsyncAtomicValueAdapter<V> implements AsyncAtomicValue<V> {
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<V> get() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<V> getAndSet(V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> set(V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
+        return null;
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AtomicValueAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AtomicValueAdapter.java
new file mode 100644
index 0000000..e6a4e2f
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/AtomicValueAdapter.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Atomic value adapter.
+ */
+public class AtomicValueAdapter<V> implements AtomicValue<V> {
+    private final String name;
+
+    public AtomicValueAdapter() {
+        this(null);
+    }
+
+    public AtomicValueAdapter(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public Type primitiveType() {
+        return null;
+    }
+
+    @Override
+    public boolean compareAndSet(V expect, V update) {
+        return false;
+    }
+
+    @Override
+    public V get() {
+        return null;
+    }
+
+    @Override
+    public V getAndSet(V value) {
+        return null;
+    }
+
+    @Override
+    public void set(V value) {
+
+    }
+
+    @Override
+    public void addListener(AtomicValueEventListener<V> listener) {
+
+    }
+
+    @Override
+    public void removeListener(AtomicValueEventListener<V> listener) {
+
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
new file mode 100644
index 0000000..f05d9aa
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/CoordinationServiceAdapter.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Coordination service adapter.
+ */
+public class CoordinationServiceAdapter implements CoordinationService {
+    @Override
+    public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
+        return null;
+    }
+
+    @Override
+    public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+        return null;
+    }
+
+    @Override
+    public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
+        return null;
+    }
+
+    @Override
+    public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
+        return null;
+    }
+
+    @Override
+    public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
+        return null;
+    }
+
+    @Override
+    public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+        return null;
+    }
+
+    @Override
+    public <E> DistributedSetBuilder<E> setBuilder() {
+        return null;
+    }
+
+    @Override
+    public AtomicCounterBuilder atomicCounterBuilder() {
+        return null;
+    }
+
+    @Override
+    public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
+        return null;
+    }
+
+    @Override
+    public <V> AtomicValueBuilder<V> atomicValueBuilder() {
+        return null;
+    }
+
+    @Override
+    public LeaderElectorBuilder leaderElectorBuilder() {
+        return null;
+    }
+
+    @Override
+    public TransactionContextBuilder transactionContextBuilder() {
+        return null;
+    }
+
+    @Override
+    public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+        return null;
+    }
+
+    @Override
+    public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+        return null;
+    }
+
+    @Override
+    public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name, Serializer serializer) {
+        return null;
+    }
+
+    @Override
+    public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name, Serializer serializer) {
+        return null;
+    }
+
+    @Override
+    public <T> Topic<T> getTopic(String name, Serializer serializer) {
+        return null;
+    }
+}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index 531a6fa..df7feff 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -15,7 +15,8 @@
  */
 package org.onosproject.cluster.impl;
 
-import com.google.common.collect.Sets;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -23,41 +24,19 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.karaf.system.SystemService;
 import org.joda.time.DateTime;
 import org.onlab.packet.IpAddress;
-import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterAdminService;
-import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterMetadata;
-import org.onosproject.cluster.ClusterMetadataAdminService;
-import org.onosproject.cluster.ClusterMetadataDiff;
-import org.onosproject.cluster.ClusterMetadataEvent;
-import org.onosproject.cluster.ClusterMetadataEventListener;
-import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
 import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.UnifiedClusterAdminService;
+import org.onosproject.cluster.UnifiedClusterService;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.Partition;
-import org.onosproject.cluster.PartitionId;
 import org.onosproject.core.Version;
-import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.core.VersionService;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.security.AppGuard.checkPermission;
 import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -67,172 +46,122 @@
  */
 @Component(immediate = true)
 @Service
-public class ClusterManager
-        extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
-        implements ClusterService, ClusterAdminService {
+public class ClusterManager implements ClusterService, ClusterAdminService {
 
-    public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
-    private static final int DEFAULT_PARTITION_SIZE = 3;
     private final Logger log = getLogger(getClass());
 
-    private ClusterStoreDelegate delegate = new InternalStoreDelegate();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private UnifiedClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterMetadataService clusterMetadataService;
+    private UnifiedClusterAdminService clusterAdminService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterMetadataAdminService clusterMetadataAdminService;
+    private VersionService versionService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterStore store;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected SystemService systemService;
-
-    private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
-    private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+    private Version version;
 
     @Activate
     public void activate() {
-        store.setDelegate(delegate);
-        eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
-        clusterMetadataService.addListener(metadataListener);
-        processMetadata(clusterMetadataService.getClusterMetadata());
+        version = versionService.version();
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        clusterMetadataService.removeListener(metadataListener);
-        store.unsetDelegate(delegate);
-        eventDispatcher.removeSink(ClusterEvent.class);
         log.info("Stopped");
     }
 
     @Override
     public ControllerNode getLocalNode() {
         checkPermission(CLUSTER_READ);
-        return store.getLocalNode();
+        return clusterService.getLocalNode();
     }
 
     @Override
     public Set<ControllerNode> getNodes() {
         checkPermission(CLUSTER_READ);
-        return store.getNodes();
+        return clusterService.getNodes()
+                .stream()
+                .filter(node -> clusterService.getVersion(node.id()).equals(version))
+                .collect(Collectors.toSet());
     }
 
     @Override
     public ControllerNode getNode(NodeId nodeId) {
         checkPermission(CLUSTER_READ);
-        checkNotNull(nodeId, INSTANCE_ID_NULL);
-        return store.getNode(nodeId);
+        Version nodeVersion = clusterService.getVersion(nodeId);
+        if (nodeVersion != null && nodeVersion.equals(version)) {
+            return clusterService.getNode(nodeId);
+        }
+        return null;
     }
 
     @Override
     public ControllerNode.State getState(NodeId nodeId) {
         checkPermission(CLUSTER_READ);
-        checkNotNull(nodeId, INSTANCE_ID_NULL);
-        return store.getState(nodeId);
+        Version nodeVersion = clusterService.getVersion(nodeId);
+        if (nodeVersion != null && nodeVersion.equals(version)) {
+            return clusterService.getState(nodeId);
+        }
+        return null;
     }
 
     @Override
     public Version getVersion(NodeId nodeId) {
         checkPermission(CLUSTER_READ);
-        checkNotNull(nodeId, INSTANCE_ID_NULL);
-        return store.getVersion(nodeId);
+        Version nodeVersion = clusterService.getVersion(nodeId);
+        if (nodeVersion != null && nodeVersion.equals(version)) {
+            return nodeVersion;
+        }
+        return null;
     }
 
     @Override
     public void markFullyStarted(boolean started) {
-        store.markFullyStarted(started);
+        clusterAdminService.markFullyStarted(started);
     }
 
     @Override
     public DateTime getLastUpdated(NodeId nodeId) {
         checkPermission(CLUSTER_READ);
-        return store.getLastUpdated(nodeId);
+        Version nodeVersion = clusterService.getVersion(nodeId);
+        if (nodeVersion != null && nodeVersion.equals(version)) {
+            return clusterService.getLastUpdated(nodeId);
+        }
+        return null;
     }
 
     @Override
     public void formCluster(Set<ControllerNode> nodes) {
-        formCluster(nodes, DEFAULT_PARTITION_SIZE);
+        clusterAdminService.formCluster(nodes);
     }
 
     @Override
     public void formCluster(Set<ControllerNode> nodes, int partitionSize) {
-        checkNotNull(nodes, "Nodes cannot be null");
-        checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
-
-        ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize));
-        clusterMetadataAdminService.setClusterMetadata(metadata);
-        try {
-            log.warn("Shutting down container for cluster reconfiguration!");
-            // Clean up persistent state associated with previous cluster configuration.
-            Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions");
-            systemService.reboot("now", SystemService.Swipe.NONE);
-        } catch (Exception e) {
-            log.error("Unable to reboot container", e);
-        }
+        clusterAdminService.formCluster(nodes, partitionSize);
     }
 
     @Override
     public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
-        checkNotNull(nodeId, INSTANCE_ID_NULL);
-        checkNotNull(ip, "IP address cannot be null");
-        checkArgument(tcpPort > 5000, "TCP port must be > 5000");
-        return store.addNode(nodeId, ip, tcpPort);
+        return clusterAdminService.addNode(nodeId, ip, tcpPort);
     }
 
     @Override
     public void removeNode(NodeId nodeId) {
-        checkNotNull(nodeId, INSTANCE_ID_NULL);
-        store.removeNode(nodeId);
-    }
-
-    // Store delegate to re-post events emitted from the store.
-    private class InternalStoreDelegate implements ClusterStoreDelegate {
-        @Override
-        public void notify(ClusterEvent event) {
-            post(event);
+        Version nodeVersion = clusterService.getVersion(nodeId);
+        if (nodeVersion != null && nodeVersion.equals(version)) {
+            clusterAdminService.removeNode(nodeId);
         }
     }
 
-    private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
-        List<ControllerNode> sorted = new ArrayList<>(nodes);
-        Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
-        Set<Partition> partitions = Sets.newHashSet();
-        // add partitions
-        int length = nodes.size();
-        int count = Math.min(partitionSize, length);
-        for (int i = 0; i < length; i++) {
-            int index = i;
-            Set<NodeId> set = new HashSet<>(count);
-            for (int j = 0; j < count; j++) {
-                set.add(sorted.get((i + j) % length).id());
-            }
-            partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set));
-        }
-        return partitions;
+    @Override
+    public void addListener(ClusterEventListener listener) {
+        clusterService.addListener(listener);
     }
 
-    /**
-     * Processes metadata by adding and removing nodes from the cluster.
-     */
-    private synchronized void processMetadata(ClusterMetadata metadata) {
-        try {
-            ClusterMetadataDiff examiner =
-                    new ClusterMetadataDiff(currentMetadata.get(), metadata);
-            examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
-            examiner.nodesRemoved().forEach(this::removeNode);
-        } finally {
-            currentMetadata.set(metadata);
-        }
-    }
-
-    private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
-        @Override
-        public void event(ClusterMetadataEvent event) {
-            processMetadata(event.subject());
-        }
+    @Override
+    public void removeListener(ClusterEventListener listener) {
+        clusterService.removeListener(listener);
     }
 }
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java
new file mode 100644
index 0000000..3617305
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/UnifiedClusterManager.java
@@ -0,0 +1,237 @@
+/*
+ * Copyright 2014-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.karaf.system.SystemService;
+import org.joda.time.DateTime;
+import org.onlab.packet.IpAddress;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataAdminService;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
+import org.onosproject.cluster.ClusterMetadataService;
+import org.onosproject.cluster.ClusterStore;
+import org.onosproject.cluster.ClusterStoreDelegate;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.UnifiedClusterAdminService;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.Partition;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.Version;
+import org.onosproject.event.AbstractListenerManager;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.CLUSTER_READ;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of the cluster service.
+ */
+@Component(immediate = true)
+@Service
+public class UnifiedClusterManager
+        extends AbstractListenerManager<ClusterEvent, ClusterEventListener>
+        implements UnifiedClusterService, UnifiedClusterAdminService {
+
+    public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+    private static final int DEFAULT_PARTITION_SIZE = 3;
+    private final Logger log = getLogger(getClass());
+
+    private ClusterStoreDelegate delegate = new InternalStoreDelegate();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterMetadataService clusterMetadataService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterMetadataAdminService clusterMetadataAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterStore store;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected SystemService systemService;
+
+    private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
+    private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+
+    @Activate
+    public void activate() {
+        store.setDelegate(delegate);
+        eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
+        clusterMetadataService.addListener(metadataListener);
+        processMetadata(clusterMetadataService.getClusterMetadata());
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterMetadataService.removeListener(metadataListener);
+        store.unsetDelegate(delegate);
+        eventDispatcher.removeSink(ClusterEvent.class);
+        log.info("Stopped");
+    }
+
+    @Override
+    public ControllerNode getLocalNode() {
+        checkPermission(CLUSTER_READ);
+        return store.getLocalNode();
+    }
+
+    @Override
+    public Set<ControllerNode> getNodes() {
+        checkPermission(CLUSTER_READ);
+        return store.getNodes();
+    }
+
+    @Override
+    public ControllerNode getNode(NodeId nodeId) {
+        checkPermission(CLUSTER_READ);
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        return store.getNode(nodeId);
+    }
+
+    @Override
+    public ControllerNode.State getState(NodeId nodeId) {
+        checkPermission(CLUSTER_READ);
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        return store.getState(nodeId);
+    }
+
+    @Override
+    public Version getVersion(NodeId nodeId) {
+        checkPermission(CLUSTER_READ);
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        return store.getVersion(nodeId);
+    }
+
+    @Override
+    public void markFullyStarted(boolean started) {
+        store.markFullyStarted(started);
+    }
+
+    @Override
+    public DateTime getLastUpdated(NodeId nodeId) {
+        checkPermission(CLUSTER_READ);
+        return store.getLastUpdated(nodeId);
+    }
+
+    @Override
+    public void formCluster(Set<ControllerNode> nodes) {
+        formCluster(nodes, DEFAULT_PARTITION_SIZE);
+    }
+
+    @Override
+    public void formCluster(Set<ControllerNode> nodes, int partitionSize) {
+        checkNotNull(nodes, "Nodes cannot be null");
+        checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
+
+        ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize));
+        clusterMetadataAdminService.setClusterMetadata(metadata);
+        try {
+            log.warn("Shutting down container for cluster reconfiguration!");
+            // Clean up persistent state associated with previous cluster configuration.
+            Tools.removeDirectory(System.getProperty("karaf.data") + "/partitions");
+            systemService.reboot("now", SystemService.Swipe.NONE);
+        } catch (Exception e) {
+            log.error("Unable to reboot container", e);
+        }
+    }
+
+    @Override
+    public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        checkNotNull(ip, "IP address cannot be null");
+        checkArgument(tcpPort > 5000, "TCP port must be > 5000");
+        return store.addNode(nodeId, ip, tcpPort);
+    }
+
+    @Override
+    public void removeNode(NodeId nodeId) {
+        checkNotNull(nodeId, INSTANCE_ID_NULL);
+        store.removeNode(nodeId);
+    }
+
+    // Store delegate to re-post events emitted from the store.
+    private class InternalStoreDelegate implements ClusterStoreDelegate {
+        @Override
+        public void notify(ClusterEvent event) {
+            post(event);
+        }
+    }
+
+    private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
+        List<ControllerNode> sorted = new ArrayList<>(nodes);
+        Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
+        Set<Partition> partitions = Sets.newHashSet();
+        // add partitions
+        int length = nodes.size();
+        int count = Math.min(partitionSize, length);
+        for (int i = 0; i < length; i++) {
+            int index = i;
+            Set<NodeId> set = new HashSet<>(count);
+            for (int j = 0; j < count; j++) {
+                set.add(sorted.get((i + j) % length).id());
+            }
+            partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set));
+        }
+        return partitions;
+    }
+
+    /**
+     * Processes metadata by adding and removing nodes from the cluster.
+     */
+    private synchronized void processMetadata(ClusterMetadata metadata) {
+        try {
+            ClusterMetadataDiff examiner =
+                    new ClusterMetadataDiff(currentMetadata.get(), metadata);
+            examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
+            examiner.nodesRemoved().forEach(this::removeNode);
+        } finally {
+            currentMetadata.set(metadata);
+        }
+    }
+
+    private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+        @Override
+        public void event(ClusterMetadataEvent event) {
+            processMetadata(event.subject());
+        }
+    }
+}
diff --git a/core/net/src/main/java/org/onosproject/core/impl/VersionManager.java b/core/net/src/main/java/org/onosproject/core/impl/VersionManager.java
index e734f7d..12210a9 100644
--- a/core/net/src/main/java/org/onosproject/core/impl/VersionManager.java
+++ b/core/net/src/main/java/org/onosproject/core/impl/VersionManager.java
@@ -54,6 +54,7 @@
             // version file not found, using default
             log.trace("Version file not found", e);
         }
+        log.info("Started");
     }
 
     @Override
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index d2b2536..bbf6d39 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -463,7 +463,7 @@
 
             // isReachable but was not MASTER or STANDBY, get a role and apply
             // Note: NONE triggers request to MastershipService
-            reassertRole(deviceId, NONE);
+            reassertRole(deviceId, mastershipService.getLocalRole(deviceId));
         }
     }
 
@@ -819,49 +819,32 @@
     private void reassertRole(final DeviceId did,
                               final MastershipRole nextRole) {
 
-        MastershipRole myNextRole = nextRole;
-        if (myNextRole == NONE) {
-            try {
-                mastershipService.requestRoleFor(did).get();
-                MastershipTerm term = termService.getMastershipTerm(did);
-                if (term != null && localNodeId.equals(term.master())) {
-                    myNextRole = MASTER;
-                } else {
-                    myNextRole = STANDBY;
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                log.error("Interrupted waiting for Mastership", e);
-            } catch (ExecutionException e) {
-                log.error("Encountered an error waiting for Mastership", e);
-            }
-        }
-
-        switch (myNextRole) {
+        switch (nextRole) {
             case MASTER:
                 final Device device = getDevice(did);
                 if ((device != null) && !isAvailable(did)) {
                     store.markOnline(did);
                 }
                 // TODO: should apply role only if there is mismatch
-                log.debug("Applying role {} to {}", myNextRole, did);
+                log.debug("Applying role {} to {}", nextRole, did);
                 if (!applyRoleAndProbe(did, MASTER)) {
-                    log.warn("Unsuccessful applying role {} to {}", myNextRole, did);
+                    log.warn("Unsuccessful applying role {} to {}", nextRole, did);
                     // immediately failed to apply role
                     mastershipService.relinquishMastership(did);
                     // FIXME disconnect?
                 }
                 break;
             case STANDBY:
-                log.debug("Applying role {} to {}", myNextRole, did);
+                log.debug("Applying role {} to {}", nextRole, did);
                 if (!applyRoleAndProbe(did, STANDBY)) {
-                    log.warn("Unsuccessful applying role {} to {}", myNextRole, did);
+                    log.warn("Unsuccessful applying role {} to {}", nextRole, did);
                     // immediately failed to apply role
                     mastershipService.relinquishMastership(did);
                     // FIXME disconnect?
                 }
                 break;
             case NONE:
+                break;
             default:
                 // should never reach here
                 log.error("You didn't see anything. I did not exist.");
diff --git a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
new file mode 100644
index 0000000..ce55eda
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
@@ -0,0 +1,397 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade.impl;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.service.AtomicValueEvent;
+import org.onosproject.store.service.AtomicValueEventListener;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.upgrade.Upgrade;
+import org.onosproject.upgrade.UpgradeAdminService;
+import org.onosproject.upgrade.UpgradeEvent;
+import org.onosproject.upgrade.UpgradeEventListener;
+import org.onosproject.upgrade.UpgradeService;
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Upgrade service implementation.
+ * <p>
+ * This implementation uses the {@link CoordinationService} to store upgrade state in a version-agnostic primitive.
+ * Upgrade state can be seen by current and future version nodes.
+ */
+@Component(immediate = true)
+@Service
+public class UpgradeManager
+        extends AbstractListenerManager<UpgradeEvent, UpgradeEventListener>
+        implements UpgradeService, UpgradeAdminService {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected VersionService versionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoordinationService coordinationService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UnifiedClusterService clusterService;
+
+    private Version localVersion;
+    private AtomicValue<Upgrade> state;
+    private final AtomicReference<Upgrade> currentState = new AtomicReference<>();
+    private final AtomicValueEventListener<Upgrade> stateListener = event -> handleChange(event);
+
+    @Activate
+    public void activate() {
+        state = coordinationService.<Upgrade>atomicValueBuilder()
+                .withName("onos-upgrade-state")
+                .withSerializer(Serializer.using(KryoNamespaces.API))
+                .build()
+                .asAtomicValue();
+        localVersion = versionService.version();
+
+        currentState.set(state.get());
+        if (currentState.get() == null) {
+            currentState.set(new Upgrade(localVersion, localVersion, Upgrade.Status.INACTIVE));
+            state.set(currentState.get());
+        }
+
+        Upgrade upgrade = currentState.get();
+
+        // If the upgrade state is not initialized, ensure this node matches the version of the cluster.
+        if (!upgrade.status().active() && !Objects.equals(upgrade.source(), localVersion)) {
+            log.error("Node version {} inconsistent with cluster version {}", localVersion, upgrade.source());
+            throw new IllegalStateException("Node version " + localVersion +
+                    " inconsistent with cluster version " + upgrade.source());
+        }
+
+        // If the upgrade state is initialized then check the node version.
+        if (upgrade.status() == Upgrade.Status.INITIALIZED) {
+            // If the source version equals the target version, attempt to update the target version.
+            if (Objects.equals(upgrade.source(), upgrade.target()) && !Objects.equals(upgrade.target(), localVersion)) {
+                upgrade = new Upgrade(upgrade.source(), localVersion, upgrade.status());
+                currentState.set(upgrade);
+                state.set(upgrade);
+            }
+        }
+
+        // If the upgrade status is active, verify that the local version matches the upgrade version.
+        if (upgrade.status().active() && !Objects.equals(upgrade.source(), upgrade.target())) {
+            // If the upgrade source/target are not equal, validate that the node's version is consistent
+            // with versions in the upgrade. There are two possibilities: that a not-yet-upgraded node is being
+            // restarted, or that a node has been upgraded, so we need to check that this node is running either
+            // the source or target version.
+            if (!Objects.equals(localVersion, upgrade.source()) && !Objects.equals(localVersion, upgrade.target())) {
+                log.error("Cannot upgrade node to version {}; Upgrade to {} already in progress",
+                        localVersion, upgrade.target());
+                throw new IllegalStateException("Cannot upgrade node to version " + localVersion + "; Upgrade to " +
+                        upgrade.target() + " already in progress");
+            }
+        }
+
+        state.addListener(stateListener);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        state.removeListener(stateListener);
+        log.info("Stopped");
+    }
+
+    @Override
+    public boolean isUpgrading() {
+        return getState().status().active();
+    }
+
+    @Override
+    public Upgrade getState() {
+        return currentState.get();
+    }
+
+    @Override
+    public Version getVersion() {
+        Upgrade upgrade = currentState.get();
+        return upgrade.status().upgraded()
+                ? upgrade.target()
+                : upgrade.source();
+    }
+
+    @Override
+    public boolean isLocalActive() {
+        return localVersion.equals(getVersion());
+    }
+
+    @Override
+    public boolean isLocalUpgraded() {
+        Upgrade upgrade = currentState.get();
+        return upgrade.status().active()
+                && !upgrade.source().equals(upgrade.target())
+                && localVersion.equals(upgrade.target());
+    }
+
+    @Override
+    public void initialize() {
+        Upgrade inactive = currentState.get();
+
+        // If the current upgrade status is active, fail initialization.
+        if (inactive.status().active()) {
+            throw new IllegalStateException("Upgrade already active");
+        }
+
+        // Set the upgrade status to INITIALIZING.
+        Upgrade initializing = new Upgrade(
+                localVersion,
+                localVersion,
+                Upgrade.Status.INITIALIZING);
+        if (!state.compareAndSet(inactive, initializing)) {
+            throw new IllegalStateException("Concurrent upgrade modification");
+        } else {
+            currentState.set(initializing);
+
+            // Set the upgrade status to INITIALIZED.
+            Upgrade initialized = new Upgrade(
+                    initializing.source(),
+                    initializing.target(),
+                    Upgrade.Status.INITIALIZED);
+            if (!state.compareAndSet(initializing, initialized)) {
+                throw new IllegalStateException("Concurrent upgrade modification");
+            } else {
+                currentState.set(initialized);
+            }
+        }
+    }
+
+    @Override
+    public void upgrade() {
+        Upgrade initialized = currentState.get();
+
+        // If the current upgrade status is not INITIALIZED, throw an exception.
+        if (initialized.status() != Upgrade.Status.INITIALIZED) {
+            throw new IllegalStateException("Upgrade not initialized");
+        }
+
+        // Set the upgrade status to UPGRADING.
+        Upgrade upgrading = new Upgrade(
+                initialized.source(),
+                initialized.target(),
+                Upgrade.Status.UPGRADING);
+        if (!state.compareAndSet(initialized, upgrading)) {
+            throw new IllegalStateException("Concurrent upgrade modification");
+        } else {
+            currentState.set(upgrading);
+
+            // Set the upgrade status to UPGRADED.
+            Upgrade upgraded = new Upgrade(
+                    upgrading.source(),
+                    upgrading.target(),
+                    Upgrade.Status.UPGRADED);
+            if (!state.compareAndSet(upgrading, upgraded)) {
+                throw new IllegalStateException("Concurrent upgrade modification");
+            } else {
+                currentState.set(upgraded);
+            }
+        }
+    }
+
+    @Override
+    public void commit() {
+        Upgrade upgraded = currentState.get();
+
+        // If the current upgrade status is not UPGRADED, throw an exception.
+        if (upgraded.status() != Upgrade.Status.UPGRADED) {
+            throw new IllegalStateException("Upgrade not performed");
+        }
+
+        // Determine whether any nodes have not been upgraded to the target version.
+        boolean upgradeComplete = clusterService.getNodes()
+                .stream()
+                .allMatch(node -> {
+                    ControllerNode.State state = clusterService.getState(node.id());
+                    Version version = clusterService.getVersion(node.id());
+                    return state.isActive() && version != null && version.equals(upgraded.target());
+                });
+
+        // If some nodes have not yet been upgraded, throw an exception.
+        if (!upgradeComplete) {
+            throw new IllegalStateException("Some nodes have not yet been upgraded to version " + upgraded.target());
+        }
+
+        // Set the upgrade status to COMMITTING.
+        Upgrade committing = new Upgrade(
+                upgraded.source(),
+                upgraded.target(),
+                Upgrade.Status.COMMITTING);
+        if (!state.compareAndSet(upgraded, committing)) {
+            throw new IllegalStateException("Concurrent upgrade modification");
+        } else {
+            currentState.set(committing);
+
+            // Set the upgrade status to COMMITTED.
+            Upgrade committed = new Upgrade(
+                    committing.source(),
+                    committing.target(),
+                    Upgrade.Status.COMMITTED);
+            if (!state.compareAndSet(committing, committed)) {
+                throw new IllegalStateException("Concurrent upgrade modification");
+            } else {
+                currentState.set(committed);
+
+                // Set the upgrade status to INACTIVE.
+                Upgrade inactive = new Upgrade(
+                        localVersion,
+                        localVersion,
+                        Upgrade.Status.INACTIVE);
+                if (!state.compareAndSet(committed, inactive)) {
+                    throw new IllegalStateException("Concurrent upgrade modification");
+                } else {
+                    currentState.set(inactive);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void rollback() {
+        Upgrade upgraded = currentState.get();
+
+        // If the current upgrade status is not UPGRADED, throw an exception.
+        if (upgraded.status() != Upgrade.Status.UPGRADED) {
+            throw new IllegalStateException("Upgrade not performed");
+        }
+
+        // Set the upgrade status to ROLLING_BACK.
+        Upgrade rollingBack = new Upgrade(
+                upgraded.source(),
+                upgraded.target(),
+                Upgrade.Status.ROLLING_BACK);
+        if (!state.compareAndSet(upgraded, rollingBack)) {
+            throw new IllegalStateException("Concurrent upgrade modification");
+        } else {
+            currentState.set(rollingBack);
+
+            // Set the upgrade status to ROLLED_BACK.
+            Upgrade rolledBack = new Upgrade(
+                    rollingBack.source(),
+                    rollingBack.target(),
+                    Upgrade.Status.ROLLED_BACK);
+            if (!state.compareAndSet(rollingBack, rolledBack)) {
+                throw new IllegalStateException("Concurrent upgrade modification");
+            } else {
+                currentState.set(rolledBack);
+            }
+        }
+    }
+
+    @Override
+    public void reset() {
+        Upgrade upgraded = currentState.get();
+
+        // If the current upgrade status is not INITIALIZED or ROLLED_BACK, throw an exception.
+        if (upgraded.status() != Upgrade.Status.INITIALIZED
+                && upgraded.status() != Upgrade.Status.ROLLED_BACK) {
+            throw new IllegalStateException("Upgrade not rolled back");
+        }
+
+        // Determine whether any nodes are still running the target version.
+        boolean rollbackComplete = clusterService.getNodes()
+                .stream()
+                .allMatch(node -> {
+                    ControllerNode.State state = clusterService.getState(node.id());
+                    Version version = clusterService.getVersion(node.id());
+                    return state.isActive() && version != null && version.equals(upgraded.source());
+                });
+
+        // If some nodes have not yet been downgraded, throw an exception.
+        if (!rollbackComplete) {
+            throw new IllegalStateException("Some nodes have not yet been downgraded to version " + upgraded.source());
+        }
+
+        // Set the upgrade status to RESETTING.
+        Upgrade resetting = new Upgrade(
+                upgraded.source(),
+                upgraded.target(),
+                Upgrade.Status.RESETTING);
+        if (!state.compareAndSet(upgraded, resetting)) {
+            throw new IllegalStateException("Concurrent upgrade modification");
+        } else {
+            currentState.set(resetting);
+
+            // Set the upgrade status to RESET.
+            Upgrade reset = new Upgrade(
+                    resetting.source(),
+                    resetting.target(),
+                    Upgrade.Status.RESET);
+            if (!state.compareAndSet(resetting, reset)) {
+                throw new IllegalStateException("Concurrent upgrade modification");
+            } else {
+                currentState.set(reset);
+
+                // Set the upgrade status to INACTIVE.
+                Upgrade inactive = new Upgrade(
+                        localVersion,
+                        localVersion,
+                        Upgrade.Status.INACTIVE);
+                if (!state.compareAndSet(reset, inactive)) {
+                    throw new IllegalStateException("Concurrent upgrade modification");
+                } else {
+                    currentState.set(inactive);
+                }
+            }
+        }
+    }
+
+    private void handleChange(AtomicValueEvent<Upgrade> event) {
+        currentState.set(event.newValue());
+        switch (event.newValue().status()) {
+            case INITIALIZED:
+                post(new UpgradeEvent(UpgradeEvent.Type.INITIALIZED, event.newValue()));
+                break;
+            case UPGRADED:
+                post(new UpgradeEvent(UpgradeEvent.Type.UPGRADED, event.newValue()));
+                break;
+            case COMMITTED:
+                post(new UpgradeEvent(UpgradeEvent.Type.COMMITTED, event.newValue()));
+                break;
+            case ROLLED_BACK:
+                post(new UpgradeEvent(UpgradeEvent.Type.ROLLED_BACK, event.newValue()));
+                break;
+            case RESET:
+                post(new UpgradeEvent(UpgradeEvent.Type.RESET, event.newValue()));
+                break;
+            default:
+                break;
+        }
+    }
+}
diff --git a/core/net/src/main/java/org/onosproject/upgrade/impl/package-info.java b/core/net/src/main/java/org/onosproject/upgrade/impl/package-info.java
new file mode 100644
index 0000000..013c93a
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/upgrade/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Software upgrade management.
+ */
+package org.onosproject.upgrade.impl;
diff --git a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
new file mode 100644
index 0000000..4137744
--- /dev/null
+++ b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.upgrade.impl;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.UnifiedClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionServiceAdapter;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AsyncAtomicValueAdapter;
+import org.onosproject.store.service.AtomicValue;
+import org.onosproject.store.service.AtomicValueAdapter;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.CoordinationServiceAdapter;
+import org.onosproject.upgrade.Upgrade;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Upgrade manager test.
+ */
+public class UpgradeManagerTest {
+
+    /**
+     * Creates a new upgrade manager to test.
+     *
+     * @param version the local node version
+     * @param state the initial upgrade state
+     * @param versions a list of controller node versions
+     * @return the activated upgrade manager
+     */
+    @SuppressWarnings("unchecked")
+    private UpgradeManager createUpgradeManager(Version version, Upgrade state, List<Version> versions) {
+        UpgradeManager upgradeManager = new UpgradeManager();
+        upgradeManager.clusterService = new UnifiedClusterServiceAdapter() {
+            @Override
+            public Set<ControllerNode> getNodes() {
+                AtomicInteger nodeCounter = new AtomicInteger();
+                return versions.stream()
+                        .map(v -> {
+                            int nodeId = nodeCounter.getAndIncrement();
+                            return new DefaultControllerNode(
+                                    NodeId.nodeId(String.valueOf(nodeId)),
+                                    IpAddress.valueOf("127.0.0.1"),
+                                    nodeId);
+                        })
+                        .collect(Collectors.toSet());
+            }
+
+            @Override
+            public ControllerNode.State getState(NodeId nodeId) {
+                return ControllerNode.State.READY;
+            }
+
+            @Override
+            public Version getVersion(NodeId nodeId) {
+                return versions.get(Integer.parseInt(nodeId.id()));
+            }
+        };
+
+        upgradeManager.versionService = new VersionServiceAdapter() {
+            @Override
+            public Version version() {
+                return version;
+            }
+        };
+
+        upgradeManager.coordinationService = new CoordinationServiceAdapter() {
+            @Override
+            public <V> AtomicValueBuilder<V> atomicValueBuilder() {
+                return new AtomicValueBuilder<V>() {
+                    @Override
+                    public AsyncAtomicValue<V> build() {
+                        return new AsyncAtomicValueAdapter() {
+                            @Override
+                            public AtomicValue asAtomicValue() {
+                                return new AtomicValueAdapter() {
+                                    private Object value = state;
+
+                                    @Override
+                                    public void set(Object value) {
+                                        this.value = value;
+                                    }
+
+                                    @Override
+                                    public Object get() {
+                                        return value;
+                                    }
+
+                                    @Override
+                                    public boolean compareAndSet(Object expect, Object update) {
+                                        if ((value == null && expect == null)
+                                                || (value != null && value.equals(expect))) {
+                                            value = update;
+                                            return true;
+                                        }
+                                        return false;
+                                    }
+                                };
+                            }
+                        };
+                    }
+                };
+            }
+        };
+
+        upgradeManager.activate();
+        return upgradeManager;
+    }
+
+    @Test
+    public void testFailedCommit() throws Exception {
+        UpgradeManager upgradeManager = createUpgradeManager(
+                Version.version("1.0.0"),
+                new Upgrade(Version.version("1.0.0"), Version.version("1.0.0"), Upgrade.Status.INACTIVE),
+                Arrays.asList(Version.version("1.0.0"), Version.version("1.0.0"), Version.version("1.0.1")));
+
+        assertEquals(Upgrade.Status.INACTIVE, upgradeManager.getState().status());
+        assertTrue(upgradeManager.isLocalActive());
+        assertFalse(upgradeManager.isLocalUpgraded());
+
+        upgradeManager.initialize();
+
+        assertEquals(Upgrade.Status.INITIALIZED, upgradeManager.getState().status());
+        assertEquals(Version.version("1.0.0"), upgradeManager.getState().source());
+        assertEquals(Version.version("1.0.0"), upgradeManager.getState().target());
+        assertEquals(Version.version("1.0.0"), upgradeManager.getVersion());
+        assertTrue(upgradeManager.isLocalActive());
+        assertFalse(upgradeManager.isLocalUpgraded());
+
+        upgradeManager.upgrade();
+        assertEquals(Upgrade.Status.UPGRADED, upgradeManager.getState().status());
+
+        try {
+            upgradeManager.commit();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+    }
+
+    @Test
+    public void testSuccessfulCommit() throws Exception {
+        UpgradeManager upgradeManager = createUpgradeManager(
+                Version.version("1.0.1"),
+                new Upgrade(Version.version("1.0.0"), Version.version("1.0.1"), Upgrade.Status.UPGRADED),
+                Arrays.asList(Version.version("1.0.1"), Version.version("1.0.1"), Version.version("1.0.1")));
+
+        assertEquals(Upgrade.Status.UPGRADED, upgradeManager.getState().status());
+        assertTrue(upgradeManager.isLocalActive());
+        assertTrue(upgradeManager.isLocalUpgraded());
+
+        upgradeManager.commit();
+        assertEquals(Upgrade.Status.INACTIVE, upgradeManager.getState().status());
+    }
+
+    @Test
+    public void testFailedReset() throws Exception {
+        UpgradeManager upgradeManager = createUpgradeManager(
+                Version.version("1.0.0"),
+                new Upgrade(Version.version("1.0.0"), Version.version("1.0.1"), Upgrade.Status.INITIALIZED),
+                Arrays.asList(Version.version("1.0.0"), Version.version("1.0.0"), Version.version("1.0.1")));
+
+        assertEquals(Upgrade.Status.INITIALIZED, upgradeManager.getState().status());
+        assertEquals(Version.version("1.0.0"), upgradeManager.getState().source());
+        assertEquals(Version.version("1.0.1"), upgradeManager.getState().target());
+        assertEquals(Version.version("1.0.0"), upgradeManager.getVersion());
+        assertTrue(upgradeManager.isLocalActive());
+        assertFalse(upgradeManager.isLocalUpgraded());
+
+        upgradeManager.upgrade();
+        assertEquals(Upgrade.Status.UPGRADED, upgradeManager.getState().status());
+        assertEquals(Version.version("1.0.1"), upgradeManager.getVersion());
+
+        upgradeManager.rollback();
+        assertEquals(Upgrade.Status.ROLLED_BACK, upgradeManager.getState().status());
+
+        try {
+            upgradeManager.reset();
+            fail();
+        } catch (IllegalStateException e) {
+        }
+    }
+
+    @Test
+    public void testSuccessfulResetFromInitialized() throws Exception {
+        UpgradeManager upgradeManager = createUpgradeManager(
+                Version.version("1.0.0"),
+                new Upgrade(Version.version("1.0.0"), Version.version("1.0.0"), Upgrade.Status.INITIALIZED),
+                Arrays.asList(Version.version("1.0.0"), Version.version("1.0.0"), Version.version("1.0.0")));
+
+        assertEquals(Upgrade.Status.INITIALIZED, upgradeManager.getState().status());
+        assertTrue(upgradeManager.isLocalActive());
+        assertFalse(upgradeManager.isLocalUpgraded());
+
+        upgradeManager.reset();
+        assertEquals(Upgrade.Status.INACTIVE, upgradeManager.getState().status());
+    }
+
+    @Test
+    public void testSuccessfulResetFromRolledBack() throws Exception {
+        UpgradeManager upgradeManager = createUpgradeManager(
+                Version.version("1.0.0"),
+                new Upgrade(Version.version("1.0.0"), Version.version("1.0.1"), Upgrade.Status.ROLLED_BACK),
+                Arrays.asList(Version.version("1.0.0"), Version.version("1.0.0"), Version.version("1.0.0")));
+
+        assertEquals(Upgrade.Status.ROLLED_BACK, upgradeManager.getState().status());
+        assertTrue(upgradeManager.isLocalActive());
+        assertFalse(upgradeManager.isLocalUpgraded());
+
+        upgradeManager.reset();
+        assertEquals(Upgrade.Status.INACTIVE, upgradeManager.getState().status());
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 78d2d4e..2bb1ea3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -15,14 +15,12 @@
  */
 package org.onosproject.store.cluster.impl;
 
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
@@ -37,13 +35,21 @@
 import org.onosproject.cluster.LeadershipStore;
 import org.onosproject.cluster.LeadershipStoreDelegate;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
 import org.onosproject.event.Change;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.onosproject.store.service.CoordinationService;
 import org.onosproject.store.service.LeaderElector;
-import org.onosproject.store.service.StorageService;
+import org.onosproject.upgrade.UpgradeEvent;
+import org.onosproject.upgrade.UpgradeEventListener;
+import org.onosproject.upgrade.UpgradeService;
 import org.slf4j.Logger;
 
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
 /**
  * Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
  * primitive.
@@ -54,25 +60,41 @@
     extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
     implements LeadershipStore {
 
+    private static final char VERSION_SEP = '|';
+
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected StorageService storageService;
+    protected CoordinationService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected VersionService versionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UpgradeService upgradeService;
 
     private ExecutorService statusChangeHandler;
     private NodeId localNodeId;
     private LeaderElector leaderElector;
     private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
+    private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
 
     private final Consumer<Change<Leadership>> leadershipChangeListener =
             change -> {
                 Leadership oldValue = change.oldValue();
                 Leadership newValue = change.newValue();
+
+                // If the topic is not relevant to this version, skip the event.
+                if (!isLocalTopic(newValue.topic())) {
+                    return;
+                }
+
                 boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
                 boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
+
                 LeadershipEvent.Type eventType = null;
                 if (leaderChanged && candidatesChanged) {
                     eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
@@ -83,7 +105,10 @@
                 if (!leaderChanged && candidatesChanged) {
                     eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
                 }
-                notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
+                notifyDelegate(new LeadershipEvent(eventType, new Leadership(
+                        parseTopic(change.newValue().topic()),
+                        change.newValue().leader(),
+                        change.newValue().candidates())));
                 // Update local cache of currently held leaderships
                 if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
                     localLeaderCache.put(newValue.topic(), newValue);
@@ -101,18 +126,27 @@
             // Service Restored
             localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
             leaderElector.getLeaderships().forEach((topic, leadership) ->
-                    notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_RESTORED, leadership)));
+                    notifyDelegate(new LeadershipEvent(
+                            LeadershipEvent.Type.SERVICE_RESTORED,
+                            new Leadership(
+                                    parseTopic(leadership.topic()),
+                                    leadership.leader(),
+                                    leadership.candidates()))));
         } else if (status == Status.SUSPENDED) {
             // Service Suspended
             localLeaderCache.forEach((topic, leadership) ->
-                    notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_DISRUPTED, leadership)));
+                    notifyDelegate(new LeadershipEvent(
+                            LeadershipEvent.Type.SERVICE_DISRUPTED,
+                            new Leadership(
+                                    parseTopic(leadership.topic()),
+                                    leadership.leader(),
+                                    leadership.candidates()))));
         } else {
             // Should be only inactive state
             return;
         }
     }
 
-
     @Activate
     public void activate() {
         statusChangeHandler = Executors.newSingleThreadExecutor(
@@ -124,6 +158,7 @@
                       .asLeaderElector();
         leaderElector.addChangeListener(leadershipChangeListener);
         leaderElector.addStatusChangeListener(clientStatusListener);
+        upgradeService.addListener(upgradeListener);
         log.info("Started");
     }
 
@@ -131,18 +166,20 @@
     public void deactivate() {
         leaderElector.removeChangeListener(leadershipChangeListener);
         leaderElector.removeStatusChangeListener(clientStatusListener);
+        upgradeService.removeListener(upgradeListener);
         statusChangeHandler.shutdown();
         log.info("Stopped");
     }
 
     @Override
     public Leadership addRegistration(String topic) {
-        return leaderElector.run(topic, localNodeId);
+        leaderElector.run(getLocalTopic(topic), localNodeId);
+        return getLeadership(topic);
     }
 
     @Override
     public void removeRegistration(String topic) {
-        leaderElector.withdraw(topic);
+        leaderElector.withdraw(getLocalTopic(topic));
     }
 
     @Override
@@ -152,21 +189,108 @@
 
     @Override
     public boolean moveLeadership(String topic, NodeId toNodeId) {
-        return leaderElector.anoint(topic, toNodeId);
+        return leaderElector.anoint(getTopicFor(topic, toNodeId), toNodeId);
     }
 
     @Override
     public boolean makeTopCandidate(String topic, NodeId nodeId) {
-        return leaderElector.promote(topic, nodeId);
+        return leaderElector.promote(getTopicFor(topic, nodeId), nodeId);
     }
 
     @Override
     public Leadership getLeadership(String topic) {
-        return leaderElector.getLeadership(topic);
+        Leadership leadership = leaderElector.getLeadership(getActiveTopic(topic));
+        return leadership != null ? new Leadership(
+                parseTopic(leadership.topic()),
+                leadership.leader(),
+                leadership.candidates()) : null;
     }
 
     @Override
     public Map<String, Leadership> getLeaderships() {
-        return leaderElector.getLeaderships();
+        Map<String, Leadership> leaderships = leaderElector.getLeaderships();
+        return leaderships.entrySet().stream()
+                .filter(e -> isActiveTopic(e.getKey()))
+                .collect(Collectors.toMap(e -> parseTopic(e.getKey()),
+                        e -> new Leadership(parseTopic(e.getKey()), e.getValue().leader(), e.getValue().candidates())));
+    }
+
+    /**
+     * Returns a leader elector topic namespaced with the local node's version.
+     *
+     * @param topic the base topic
+     * @return a topic string namespaced with the local node's version
+     */
+    private String getLocalTopic(String topic) {
+        return topic + VERSION_SEP + versionService.version();
+    }
+
+    /**
+     * Returns a leader elector topic namespaced with the current cluster version.
+     *
+     * @param topic the base topic
+     * @return a topic string namespaced with the current cluster version
+     */
+    private String getActiveTopic(String topic) {
+        return topic + VERSION_SEP + upgradeService.getVersion();
+    }
+
+    /**
+     * Returns whether the given topic is a topic for the local version.
+     *
+     * @param topic the topic to check
+     * @return whether the given topic is relevant to the local version
+     */
+    private boolean isLocalTopic(String topic) {
+        return topic.endsWith(versionService.version().toString());
+    }
+
+    /**
+     * Returns whether the given topic is a topic for the current cluster version.
+     *
+     * @param topic the topic to check
+     * @return whether the given topic is relevant to the current cluster version
+     */
+    private boolean isActiveTopic(String topic) {
+        return topic.endsWith(VERSION_SEP + upgradeService.getVersion().toString());
+    }
+
+    /**
+     * Parses a topic string, returning the base topic.
+     *
+     * @param topic the topic string to parse
+     * @return the base topic string
+     */
+    private String parseTopic(String topic) {
+        return topic.substring(0, topic.lastIndexOf(VERSION_SEP));
+    }
+
+    /**
+     * Returns the versioned topic for the given node.
+     *
+     * @param topic the topic for the given node
+     * @param nodeId the node for which to return the namespaced topic
+     * @return the versioned topic for the given node
+     */
+    private String getTopicFor(String topic, NodeId nodeId) {
+        Version nodeVersion = clusterService.getVersion(nodeId);
+        return nodeVersion != null ? topic + VERSION_SEP + nodeVersion : topic + VERSION_SEP + versionService.version();
+    }
+
+    /**
+     * Internal upgrade event listener.
+     */
+    private class InternalUpgradeEventListener implements UpgradeEventListener {
+        @Override
+        public void event(UpgradeEvent event) {
+            if (event.type() == UpgradeEvent.Type.UPGRADED || event.type() == UpgradeEvent.Type.ROLLED_BACK) {
+                // Iterate through all current leaderships for the new version and trigger events.
+                for (Leadership leadership : getLeaderships().values()) {
+                    notifyDelegate(new LeadershipEvent(
+                            LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED,
+                            leadership));
+                }
+            }
+        }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
new file mode 100644
index 0000000..aa96131
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/AbstractClusterCommunicationManager.java
@@ -0,0 +1,347 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.utils.MeteringAgent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
+
+@Component(componentAbstract = true)
+public abstract class AbstractClusterCommunicationManager
+        implements ClusterCommunicationService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
+    private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
+
+    private static final String PRIMITIVE_NAME = "clusterCommunication";
+    private static final String SUBJECT_PREFIX = "subject";
+    private static final String ENDPOINT_PREFIX = "endpoint";
+
+    private static final String SERIALIZING = "serialization";
+    private static final String DESERIALIZING = "deserialization";
+    private static final String NODE_PREFIX = "node:";
+    private static final String ROUND_TRIP_SUFFIX = ".rtt";
+    private static final String ONE_WAY_SUFFIX = ".oneway";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UnifiedClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MessagingService messagingService;
+
+    private NodeId localNodeId;
+
+    /**
+     * Returns the type for the given message subject.
+     *
+     * @param subject the type for the given message subject
+     * @return the message subject
+     */
+    protected abstract String getType(MessageSubject subject);
+
+    @Activate
+    public void activate() {
+        localNodeId = clusterService.getLocalNode().id();
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public <M> void broadcast(M message,
+                              MessageSubject subject,
+                              Function<M, byte[]> encoder) {
+        checkPermission(CLUSTER_WRITE);
+        multicast(message,
+                  subject,
+                  encoder,
+                  clusterService.getNodes()
+                      .stream()
+                      .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
+                      .map(ControllerNode::id)
+                      .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public <M> void broadcastIncludeSelf(M message,
+                                         MessageSubject subject,
+                                         Function<M, byte[]> encoder) {
+        checkPermission(CLUSTER_WRITE);
+        multicast(message,
+                  subject,
+                  encoder,
+                  clusterService.getNodes()
+                      .stream()
+                      .map(ControllerNode::id)
+                      .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public <M> CompletableFuture<Void> unicast(M message,
+                                               MessageSubject subject,
+                                               Function<M, byte[]> encoder,
+                                               NodeId toNodeId) {
+        checkPermission(CLUSTER_WRITE);
+        try {
+            byte[] payload = new ClusterMessage(
+                    localNodeId,
+                    subject,
+                    timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
+                    ).getBytes();
+            return doUnicast(subject, payload, toNodeId);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    @Override
+    public <M> void multicast(M message,
+                              MessageSubject subject,
+                              Function<M, byte[]> encoder,
+                              Set<NodeId> nodes) {
+        checkPermission(CLUSTER_WRITE);
+        byte[] payload = new ClusterMessage(
+                localNodeId,
+                subject,
+                timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
+                .getBytes();
+        nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
+    }
+
+    @Override
+    public <M, R> CompletableFuture<R> sendAndReceive(M message,
+                                                      MessageSubject subject,
+                                                      Function<M, byte[]> encoder,
+                                                      Function<byte[], R> decoder,
+                                                      NodeId toNodeId) {
+        checkPermission(CLUSTER_WRITE);
+        try {
+            ClusterMessage envelope = new ClusterMessage(
+                    clusterService.getLocalNode().id(),
+                    subject,
+                    timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
+                            apply(message));
+            return sendAndReceive(subject, envelope.getBytes(), toNodeId).
+                    thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+        ControllerNode node = clusterService.getNode(toNodeId);
+        checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+        MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
+        return messagingService.sendAsync(nodeEp, getType(subject), payload).whenComplete((r, e) -> context.stop(e));
+    }
+
+    private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+        ControllerNode node = clusterService.getNode(toNodeId);
+        checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
+        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
+        MeteringAgent.Context epContext = endpointMeteringAgent.
+                startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
+        MeteringAgent.Context subjectContext = subjectMeteringAgent.
+                startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
+        return messagingService.sendAndReceive(nodeEp, getType(subject), payload).
+                whenComplete((bytes, throwable) -> {
+                    subjectContext.stop(throwable);
+                    epContext.stop(throwable);
+                });
+    }
+
+    @Override
+    public void addSubscriber(MessageSubject subject,
+                              ClusterMessageHandler subscriber,
+                              ExecutorService executor) {
+        checkPermission(CLUSTER_WRITE);
+        messagingService.registerHandler(getType(subject),
+                new InternalClusterMessageHandler(subscriber),
+                executor);
+    }
+
+    @Override
+    public void removeSubscriber(MessageSubject subject) {
+        checkPermission(CLUSTER_WRITE);
+        messagingService.unregisterHandler(getType(subject));
+    }
+
+    @Override
+    public <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, R> handler,
+            Function<R, byte[]> encoder,
+            Executor executor) {
+        checkPermission(CLUSTER_WRITE);
+        messagingService.registerHandler(getType(subject),
+                new InternalMessageResponder<M, R>(decoder, encoder, m -> {
+                    CompletableFuture<R> responseFuture = new CompletableFuture<>();
+                    executor.execute(() -> {
+                        try {
+                            responseFuture.complete(handler.apply(m));
+                        } catch (Exception e) {
+                            responseFuture.completeExceptionally(e);
+                        }
+                    });
+                    return responseFuture;
+                }));
+    }
+
+    @Override
+    public <M, R> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Function<M, CompletableFuture<R>> handler,
+            Function<R, byte[]> encoder) {
+        checkPermission(CLUSTER_WRITE);
+        messagingService.registerHandler(getType(subject),
+                new InternalMessageResponder<>(decoder, encoder, handler));
+    }
+
+    @Override
+    public <M> void addSubscriber(MessageSubject subject,
+            Function<byte[], M> decoder,
+            Consumer<M> handler,
+            Executor executor) {
+        checkPermission(CLUSTER_WRITE);
+        messagingService.registerHandler(getType(subject),
+                new InternalMessageConsumer<>(decoder, handler),
+                executor);
+    }
+
+    /**
+     * Performs the timed function, returning the value it would while timing the operation.
+     *
+     * @param timedFunction the function to be timed
+     * @param meter the metering agent to be used to time the function
+     * @param opName the opname to be used when starting the meter
+     * @param <A> The param type of the function
+     * @param <B> The return type of the function
+     * @return the value returned by the timed function
+     */
+    private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
+                                               MeteringAgent meter, String opName) {
+        checkNotNull(timedFunction);
+        checkNotNull(meter);
+        checkNotNull(opName);
+        return new Function<A, B>() {
+            @Override
+            public B apply(A a) {
+                final MeteringAgent.Context context = meter.startTimer(opName);
+                B result = null;
+                try {
+                    result = timedFunction.apply(a);
+                    context.stop(null);
+                    return result;
+                } catch (Exception e) {
+                    context.stop(e);
+                    Throwables.propagate(e);
+                    return null;
+                }
+            }
+        };
+    }
+
+
+    private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
+        private ClusterMessageHandler handler;
+
+        public InternalClusterMessageHandler(ClusterMessageHandler handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public byte[] apply(Endpoint sender, byte[] bytes) {
+            ClusterMessage message = ClusterMessage.fromBytes(bytes);
+            handler.handle(message);
+            return message.response();
+        }
+    }
+
+    private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
+        private final Function<byte[], M> decoder;
+        private final Function<R, byte[]> encoder;
+        private final Function<M, CompletableFuture<R>> handler;
+
+        public InternalMessageResponder(Function<byte[], M> decoder,
+                                        Function<R, byte[]> encoder,
+                                        Function<M, CompletableFuture<R>> handler) {
+            this.decoder = decoder;
+            this.encoder = encoder;
+            this.handler = handler;
+        }
+
+        @Override
+        public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
+            return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+                    apply(ClusterMessage.fromBytes(bytes).payload())).
+                    thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
+        }
+    }
+
+    private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
+        private final Function<byte[], M> decoder;
+        private final Consumer<M> consumer;
+
+        public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
+            this.decoder = decoder;
+            this.consumer = consumer;
+        }
+
+        @Override
+        public void accept(Endpoint sender, byte[] bytes) {
+            consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
+                    apply(ClusterMessage.fromBytes(bytes).payload()));
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 1bdac37..4602702 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -15,328 +15,24 @@
  */
 package org.onosproject.store.cluster.messaging.impl;
 
-import com.google.common.base.Throwables;
-import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.core.VersionService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.utils.MeteringAgent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.CLUSTER_WRITE;
 
 @Component(immediate = true)
 @Service
-public class ClusterCommunicationManager
-        implements ClusterCommunicationService {
+public class ClusterCommunicationManager extends AbstractClusterCommunicationManager {
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
-    private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
-
-    private static final String PRIMITIVE_NAME = "clusterCommunication";
-    private static final String SUBJECT_PREFIX = "subject";
-    private static final String ENDPOINT_PREFIX = "endpoint";
-
-    private static final String SERIALIZING = "serialization";
-    private static final String DESERIALIZING = "deserialization";
-    private static final String NODE_PREFIX = "node:";
-    private static final String ROUND_TRIP_SUFFIX = ".rtt";
-    private static final String ONE_WAY_SUFFIX = ".oneway";
+    private static final char VERSION_SEP = '-';
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected MessagingService messagingService;
-
-    private NodeId localNodeId;
-
-    @Activate
-    public void activate() {
-        localNodeId = clusterService.getLocalNode().id();
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
+    private VersionService versionService;
 
     @Override
-    public <M> void broadcast(M message,
-                              MessageSubject subject,
-                              Function<M, byte[]> encoder) {
-        checkPermission(CLUSTER_WRITE);
-        multicast(message,
-                  subject,
-                  encoder,
-                  clusterService.getNodes()
-                      .stream()
-                      .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
-                      .map(ControllerNode::id)
-                      .collect(Collectors.toSet()));
-    }
-
-    @Override
-    public <M> void broadcastIncludeSelf(M message,
-                                         MessageSubject subject,
-                                         Function<M, byte[]> encoder) {
-        checkPermission(CLUSTER_WRITE);
-        multicast(message,
-                  subject,
-                  encoder,
-                  clusterService.getNodes()
-                      .stream()
-                      .map(ControllerNode::id)
-                      .collect(Collectors.toSet()));
-    }
-
-    @Override
-    public <M> CompletableFuture<Void> unicast(M message,
-                                               MessageSubject subject,
-                                               Function<M, byte[]> encoder,
-                                               NodeId toNodeId) {
-        checkPermission(CLUSTER_WRITE);
-        try {
-            byte[] payload = new ClusterMessage(
-                    localNodeId,
-                    subject,
-                    timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
-                    ).getBytes();
-            return doUnicast(subject, payload, toNodeId);
-        } catch (Exception e) {
-            return Tools.exceptionalFuture(e);
-        }
-    }
-
-    @Override
-    public <M> void multicast(M message,
-                              MessageSubject subject,
-                              Function<M, byte[]> encoder,
-                              Set<NodeId> nodes) {
-        checkPermission(CLUSTER_WRITE);
-        byte[] payload = new ClusterMessage(
-                localNodeId,
-                subject,
-                timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
-                .getBytes();
-        nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
-    }
-
-    @Override
-    public <M, R> CompletableFuture<R> sendAndReceive(M message,
-                                                      MessageSubject subject,
-                                                      Function<M, byte[]> encoder,
-                                                      Function<byte[], R> decoder,
-                                                      NodeId toNodeId) {
-        checkPermission(CLUSTER_WRITE);
-        try {
-            ClusterMessage envelope = new ClusterMessage(
-                    clusterService.getLocalNode().id(),
-                    subject,
-                    timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
-                            apply(message));
-            return sendAndReceive(subject, envelope.getBytes(), toNodeId).
-                    thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
-        } catch (Exception e) {
-            return Tools.exceptionalFuture(e);
-        }
-    }
-
-    private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
-        ControllerNode node = clusterService.getNode(toNodeId);
-        checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
-        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
-        MeteringAgent.Context context = subjectMeteringAgent.startTimer(subject.toString() + ONE_WAY_SUFFIX);
-        return messagingService.sendAsync(nodeEp, subject.value(), payload).whenComplete((r, e) -> context.stop(e));
-    }
-
-    private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
-        ControllerNode node = clusterService.getNode(toNodeId);
-        checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
-        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
-        MeteringAgent.Context epContext = endpointMeteringAgent.
-                startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
-        MeteringAgent.Context subjectContext = subjectMeteringAgent.
-                startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
-        return messagingService.sendAndReceive(nodeEp, subject.value(), payload).
-                whenComplete((bytes, throwable) -> {
-                    subjectContext.stop(throwable);
-                    epContext.stop(throwable);
-                });
-    }
-
-    @Override
-    public void addSubscriber(MessageSubject subject,
-                              ClusterMessageHandler subscriber,
-                              ExecutorService executor) {
-        checkPermission(CLUSTER_WRITE);
-        messagingService.registerHandler(subject.value(),
-                new InternalClusterMessageHandler(subscriber),
-                executor);
-    }
-
-    @Override
-    public void removeSubscriber(MessageSubject subject) {
-        checkPermission(CLUSTER_WRITE);
-        messagingService.unregisterHandler(subject.value());
-    }
-
-    @Override
-    public <M, R> void addSubscriber(MessageSubject subject,
-            Function<byte[], M> decoder,
-            Function<M, R> handler,
-            Function<R, byte[]> encoder,
-            Executor executor) {
-        checkPermission(CLUSTER_WRITE);
-        messagingService.registerHandler(subject.value(),
-                new InternalMessageResponder<M, R>(decoder, encoder, m -> {
-                    CompletableFuture<R> responseFuture = new CompletableFuture<>();
-                    executor.execute(() -> {
-                        try {
-                            responseFuture.complete(handler.apply(m));
-                        } catch (Exception e) {
-                            responseFuture.completeExceptionally(e);
-                        }
-                    });
-                    return responseFuture;
-                }));
-    }
-
-    @Override
-    public <M, R> void addSubscriber(MessageSubject subject,
-            Function<byte[], M> decoder,
-            Function<M, CompletableFuture<R>> handler,
-            Function<R, byte[]> encoder) {
-        checkPermission(CLUSTER_WRITE);
-        messagingService.registerHandler(subject.value(),
-                new InternalMessageResponder<>(decoder, encoder, handler));
-    }
-
-    @Override
-    public <M> void addSubscriber(MessageSubject subject,
-            Function<byte[], M> decoder,
-            Consumer<M> handler,
-            Executor executor) {
-        checkPermission(CLUSTER_WRITE);
-        messagingService.registerHandler(subject.value(),
-                new InternalMessageConsumer<>(decoder, handler),
-                executor);
-    }
-
-    /**
-     * Performs the timed function, returning the value it would while timing the operation.
-     *
-     * @param timedFunction the function to be timed
-     * @param meter the metering agent to be used to time the function
-     * @param opName the opname to be used when starting the meter
-     * @param <A> The param type of the function
-     * @param <B> The return type of the function
-     * @return the value returned by the timed function
-     */
-    private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
-                                               MeteringAgent meter, String opName) {
-        checkNotNull(timedFunction);
-        checkNotNull(meter);
-        checkNotNull(opName);
-        return new Function<A, B>() {
-            @Override
-            public B apply(A a) {
-                final MeteringAgent.Context context = meter.startTimer(opName);
-                B result = null;
-                try {
-                    result = timedFunction.apply(a);
-                    context.stop(null);
-                    return result;
-                } catch (Exception e) {
-                    context.stop(e);
-                    Throwables.propagate(e);
-                    return null;
-                }
-            }
-        };
-    }
-
-
-    private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
-        private ClusterMessageHandler handler;
-
-        public InternalClusterMessageHandler(ClusterMessageHandler handler) {
-            this.handler = handler;
-        }
-
-        @Override
-        public byte[] apply(Endpoint sender, byte[] bytes) {
-            ClusterMessage message = ClusterMessage.fromBytes(bytes);
-            handler.handle(message);
-            return message.response();
-        }
-    }
-
-    private class InternalMessageResponder<M, R> implements BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> {
-        private final Function<byte[], M> decoder;
-        private final Function<R, byte[]> encoder;
-        private final Function<M, CompletableFuture<R>> handler;
-
-        public InternalMessageResponder(Function<byte[], M> decoder,
-                                        Function<R, byte[]> encoder,
-                                        Function<M, CompletableFuture<R>> handler) {
-            this.decoder = decoder;
-            this.encoder = encoder;
-            this.handler = handler;
-        }
-
-        @Override
-        public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
-            return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
-                    apply(ClusterMessage.fromBytes(bytes).payload())).
-                    thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
-        }
-    }
-
-    private class InternalMessageConsumer<M> implements BiConsumer<Endpoint, byte[]> {
-        private final Function<byte[], M> decoder;
-        private final Consumer<M> consumer;
-
-        public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
-            this.decoder = decoder;
-            this.consumer = consumer;
-        }
-
-        @Override
-        public void accept(Endpoint sender, byte[] bytes) {
-            consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
-                    apply(ClusterMessage.fromBytes(bytes).payload()));
-        }
+    protected String getType(MessageSubject subject) {
+        return subject.value() + VERSION_SEP + versionService.version().toString();
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
new file mode 100644
index 0000000..45c84af
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/UnifiedClusterCommunicationManager.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+
+@Component(immediate = true)
+@Service
+public class UnifiedClusterCommunicationManager
+        extends AbstractClusterCommunicationManager
+        implements UnifiedClusterCommunicationService {
+    @Override
+    protected String getType(MessageSubject subject) {
+        return subject.value();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 136df56..5d9f453 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -60,6 +60,7 @@
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.upgrade.UpgradeService;
 import org.slf4j.Logger;
 
 import com.google.common.base.Objects;
@@ -90,6 +91,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UpgradeService upgradeService;
+
     private NodeId localNodeId;
 
     private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
@@ -155,8 +159,12 @@
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
         Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
-        return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
-                ? MastershipRole.MASTER : MastershipRole.STANDBY);
+        NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+        List<NodeId> candidates = leadership == null ?
+                ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+        MastershipRole role = Objects.equal(localNodeId, leader) ?
+                MastershipRole.MASTER : candidates.contains(localNodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
+        return CompletableFuture.completedFuture(role);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
new file mode 100644
index 0000000..29045a2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.AtomicCounterBuilder;
+import org.onosproject.store.service.AtomicCounterMapBuilder;
+import org.onosproject.store.service.AtomicIdGeneratorBuilder;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.ConsistentMultimapBuilder;
+import org.onosproject.store.service.ConsistentTreeMapBuilder;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.DocumentTreeBuilder;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.LeaderElectorBuilder;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueue;
+import org.slf4j.Logger;
+
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of {@code CoordinationService} that uses a {@link StoragePartition} that spans all the nodes
+ * in the cluster regardless of version.
+ */
+@Service
+@Component(immediate = true)
+public class CoordinationManager implements CoordinationService {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UnifiedClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UnifiedClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PersistenceService persistenceService;
+
+    private StoragePartition partition;
+    private DistributedPrimitiveCreator primitiveCreator;
+
+    @Activate
+    public void activate() {
+        partition = new StoragePartition(
+                new DefaultPartition(
+                        PartitionId.SHARED,
+                        clusterService.getNodes()
+                                .stream()
+                                .map(ControllerNode::id)
+                                .collect(Collectors.toSet())),
+                null,
+                null,
+                clusterCommunicator,
+                clusterService,
+                new File(System.getProperty("karaf.data") + "/partitions/coordination"));
+        partition.open().join();
+        primitiveCreator = partition.client();
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+                clusterCommunicator,
+                persistenceService);
+    }
+
+    @Override
+    public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultConsistentMapBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultDocumentTreeBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
+        return new DefaultConsistentTreeMapBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultConsistentMultimapBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultAtomicCounterMapBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <E> DistributedSetBuilder<E> setBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
+    }
+
+    @Override
+    public AtomicCounterBuilder atomicCounterBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultAtomicCounterBuilder(primitiveCreator);
+    }
+
+    @Override
+    public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultAtomicIdGeneratorBuilder(primitiveCreator);
+    }
+
+    @Override
+    public <V> AtomicValueBuilder<V> atomicValueBuilder() {
+        checkPermission(STORAGE_WRITE);
+        Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
+                () -> this.<String, byte[]>consistentMapBuilder()
+                          .withName("onos-atomic-values")
+                          .withSerializer(Serializer.using(KryoNamespaces.BASIC));
+        return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
+    }
+
+    @Override
+    public TransactionContextBuilder transactionContextBuilder() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LeaderElectorBuilder leaderElectorBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultLeaderElectorBuilder(primitiveCreator);
+    }
+
+    @Override
+    public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return primitiveCreator.newWorkQueue(name, serializer);
+    }
+
+    @Override
+    public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return primitiveCreator.newAsyncDocumentTree(name, serializer);
+    }
+
+    @Override
+    public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
+            String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return primitiveCreator.newAsyncConsistentSetMultimap(name,
+                                                                serializer);
+    }
+
+    @Override
+    public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
+            String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return primitiveCreator.newAsyncConsistentTreeMap(name, serializer);
+    }
+
+    @Override
+    public <T> Topic<T> getTopic(String name, Serializer serializer) {
+        AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
+                                              .withName("topic-" + name)
+                                              .withSerializer(serializer)
+                                              .build();
+        return new DefaultDistributedTopic<>(atomicValue);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index d15451e..1bfaaed 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -16,11 +16,11 @@
 package org.onosproject.store.primitives.impl;
 
 import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 
@@ -38,8 +38,8 @@
  */
 public class EventuallyConsistentMapBuilderImpl<K, V>
         implements EventuallyConsistentMapBuilder<K, V> {
-    private final ClusterService clusterService;
-    private final ClusterCommunicationService clusterCommunicator;
+    private final MembershipService clusterService;
+    private final ClusterCommunicator clusterCommunicator;
 
     private String name;
     private KryoNamespace serializer;
@@ -64,8 +64,8 @@
      * @param clusterCommunicator cluster communication service
      * @param persistenceService persistence service
      */
-    public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
-                                              ClusterCommunicationService clusterCommunicator,
+    public EventuallyConsistentMapBuilderImpl(MembershipService clusterService,
+                                              ClusterCommunicator clusterCommunicator,
                                               PersistenceService persistenceService) {
         this.persistenceService = persistenceService;
         this.clusterService = checkNotNull(clusterService);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index bed19d5..461245e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -25,13 +25,13 @@
 import org.onlab.util.AbstractAccumulator;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.LogicalTimestamp;
 import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.DistributedPrimitive;
@@ -86,8 +86,8 @@
 
     private final Map<K, MapValue<V>> items;
 
-    private final ClusterService clusterService;
-    private final ClusterCommunicationService clusterCommunicator;
+    private final MembershipService clusterService;
+    private final ClusterCommunicator clusterCommunicator;
     private final Serializer serializer;
     private final NodeId localNodeId;
     private final PersistenceService persistenceService;
@@ -162,8 +162,8 @@
      * @param persistenceService    persistence service
      */
     EventuallyConsistentMapImpl(String mapName,
-                                ClusterService clusterService,
-                                ClusterCommunicationService clusterCommunicator,
+                                MembershipService clusterService,
+                                ClusterCommunicator clusterCommunicator,
                                 KryoNamespace ns,
                                 BiFunction<K, V, Timestamp> timestampProvider,
                                 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 500b75c..4a92682 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -42,8 +42,10 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionDiff;
 import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.PartitionAdminService;
 import org.onosproject.store.primitives.PartitionEvent;
@@ -51,6 +53,7 @@
 import org.onosproject.store.primitives.PartitionService;
 import org.onosproject.store.service.PartitionClientInfo;
 import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.upgrade.UpgradeService;
 import org.slf4j.Logger;
 
 import static org.onosproject.security.AppGuard.checkPermission;
@@ -68,7 +71,7 @@
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
+    protected UnifiedClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterMetadataService metadataService;
@@ -76,7 +79,14 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UpgradeService upgradeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected VersionService versionService;
+
+    private final Map<PartitionId, StoragePartition> inactivePartitions = Maps.newConcurrentMap();
+    private final Map<PartitionId, StoragePartition> activePartitions = Maps.newConcurrentMap();
     private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
     private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
 
@@ -85,17 +95,58 @@
         eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
         currentClusterMetadata.set(metadataService.getClusterMetadata());
         metadataService.addListener(metadataListener);
-        currentClusterMetadata.get()
-                       .getPartitions()
-                       .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
-                               clusterCommunicator,
-                               clusterService,
-                               new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
 
-        CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
-                                                                               .stream()
-                                                                               .map(StoragePartition::open)
-                                                                               .toArray(CompletableFuture[]::new));
+        // If an upgrade is currently in progress and this node is an upgraded node, initialize upgrade partitions.
+        CompletableFuture<Void> openFuture;
+        if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
+            Version sourceVersion = upgradeService.getState().source();
+            Version targetVersion = upgradeService.getState().target();
+            currentClusterMetadata.get()
+                    .getPartitions()
+                    .forEach(partition -> inactivePartitions.put(partition.getId(), new StoragePartition(
+                            partition,
+                            sourceVersion,
+                            null,
+                            clusterCommunicator,
+                            clusterService,
+                            new File(System.getProperty("karaf.data") +
+                                    "/partitions/" + sourceVersion + "/" + partition.getId()))));
+            currentClusterMetadata.get()
+                    .getPartitions()
+                    .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+                            partition,
+                            targetVersion,
+                            sourceVersion,
+                            clusterCommunicator,
+                            clusterService,
+                            new File(System.getProperty("karaf.data") +
+                                    "/partitions/" + targetVersion + "/" + partition.getId()))));
+
+            // We have to fork existing partitions before we can start inactive partition servers to
+            // avoid duplicate message handlers when both servers are running.
+            openFuture = CompletableFuture.allOf(activePartitions.values().stream()
+                    .map(StoragePartition::open)
+                    .toArray(CompletableFuture[]::new))
+                    .thenCompose(v -> CompletableFuture.allOf(inactivePartitions.values().stream()
+                            .map(StoragePartition::open)
+                            .toArray(CompletableFuture[]::new)));
+        } else {
+            Version version = versionService.version();
+            currentClusterMetadata.get()
+                    .getPartitions()
+                    .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+                            partition,
+                            version,
+                            null,
+                            clusterCommunicator,
+                            clusterService,
+                            new File(System.getProperty("karaf.data") +
+                                    "/partitions/" + version + "/" + partition.getId()))));
+            openFuture = CompletableFuture.allOf(activePartitions.values().stream()
+                    .map(StoragePartition::open)
+                    .toArray(CompletableFuture[]::new));
+        }
+
         openFuture.join();
         log.info("Started");
     }
@@ -105,10 +156,13 @@
         metadataService.removeListener(metadataListener);
         eventDispatcher.removeSink(PartitionEvent.class);
 
-        CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
-                                                                                .stream()
-                                                                                .map(StoragePartition::close)
-                                                                                .toArray(CompletableFuture[]::new));
+        CompletableFuture<Void> closeFuture = CompletableFuture.allOf(
+                CompletableFuture.allOf(inactivePartitions.values().stream()
+                        .map(StoragePartition::close)
+                        .toArray(CompletableFuture[]::new)),
+                CompletableFuture.allOf(activePartitions.values().stream()
+                        .map(StoragePartition::close)
+                        .toArray(CompletableFuture[]::new)));
         closeFuture.join();
         log.info("Stopped");
     }
@@ -116,25 +170,25 @@
     @Override
     public int getNumberOfPartitions() {
         checkPermission(PARTITION_READ);
-        return partitions.size();
+        return activePartitions.size();
     }
 
     @Override
     public Set<PartitionId> getAllPartitionIds() {
         checkPermission(PARTITION_READ);
-        return partitions.keySet();
+        return activePartitions.keySet();
     }
 
     @Override
     public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
         checkPermission(PARTITION_READ);
-        return partitions.get(partitionId).client();
+        return activePartitions.get(partitionId).client();
     }
 
     @Override
     public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
         checkPermission(PARTITION_READ);
-        StoragePartition partition = partitions.get(partitionId);
+        StoragePartition partition = activePartitions.get(partitionId);
         return ImmutableSet.copyOf(partition.getMembers());
     }
 
@@ -148,7 +202,7 @@
 
     @Override
     public List<PartitionInfo> partitionInfo() {
-        return partitions.values()
+        return activePartitions.values()
                          .stream()
                          .flatMap(x -> Tools.stream(x.info()))
                          .collect(Collectors.toList());
@@ -161,7 +215,7 @@
                     .values()
                     .stream()
                     .filter(PartitionDiff::hasChanged)
-                    .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
+                    .forEach(diff -> activePartitions.get(diff.partitionId()).onUpdate(diff.newValue()));
     }
 
     private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
@@ -173,7 +227,7 @@
 
     @Override
     public List<PartitionClientInfo> partitionClientInfo() {
-        return partitions.values()
+        return activePartitions.values()
                          .stream()
                          .map(StoragePartition::client)
                          .map(StoragePartitionClient::clientInfo)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 40e9fa0..8bae2a3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -40,8 +40,7 @@
 import io.atomix.protocols.raft.protocol.ResetRequest;
 import io.atomix.protocols.raft.session.SessionId;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.service.Serializer;
 
 /**
@@ -50,10 +49,10 @@
 public class RaftClientCommunicator extends RaftCommunicator implements RaftClientProtocol {
 
     public RaftClientCommunicator(
-            PartitionId partitionId,
+            String prefix,
             Serializer serializer,
-            ClusterCommunicationService clusterCommunicator) {
-        super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+            ClusterCommunicator clusterCommunicator) {
+        super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
index 1117ab9..765eb02 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
@@ -22,7 +22,7 @@
 import io.atomix.protocols.raft.RaftException;
 import io.atomix.protocols.raft.cluster.MemberId;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.cluster.messaging.MessagingException;
 import org.onosproject.store.service.Serializer;
@@ -35,12 +35,12 @@
 public abstract class RaftCommunicator {
     protected final RaftMessageContext context;
     protected final Serializer serializer;
-    protected final ClusterCommunicationService clusterCommunicator;
+    protected final ClusterCommunicator clusterCommunicator;
 
     public RaftCommunicator(
             RaftMessageContext context,
             Serializer serializer,
-            ClusterCommunicationService clusterCommunicator) {
+            ClusterCommunicator clusterCommunicator) {
         this.context = checkNotNull(context, "context cannot be null");
         this.serializer = checkNotNull(serializer, "serializer cannot be null");
         this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 9b8f3e6..2710a2c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -56,8 +56,8 @@
 import io.atomix.protocols.raft.protocol.VoteResponse;
 import io.atomix.protocols.raft.session.SessionId;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.service.Serializer;
 
 /**
@@ -66,10 +66,10 @@
 public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
 
     public RaftServerCommunicator(
-            PartitionId partitionId,
+            String prefix,
             Serializer serializer,
-            ClusterCommunicationService clusterCommunicator) {
-        super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+            ClusterCommunicator clusterCommunicator) {
+        super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 822641c..59c4b17 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,10 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
-import static org.slf4j.LoggerFactory.getLogger;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -26,25 +22,26 @@
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.PartitionId;
+import org.onosproject.cluster.UnifiedClusterService;
 import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.PartitionAdminService;
 import org.onosproject.store.primitives.PartitionService;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.AtomicCounterMapBuilder;
 import org.onosproject.store.service.AtomicIdGeneratorBuilder;
@@ -68,7 +65,9 @@
 import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
-import com.google.common.collect.Maps;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Implementation for {@code StorageService} and {@code StorageAdminService}.
@@ -82,10 +81,10 @@
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
+    protected UnifiedClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
+    protected UnifiedClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PersistenceService persistenceService;
@@ -105,7 +104,7 @@
     public void activate() {
         Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
         partitionService.getAllPartitionIds().stream()
-            .filter(id -> !id.equals(PartitionId.from(0)))
+            .filter(id -> !id.equals(PartitionId.SHARED))
             .forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
         federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap, BUCKETS);
         transactionManager = new TransactionManager(this, partitionService, BUCKETS);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 842e047..414c49c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -29,11 +29,12 @@
 import com.google.common.collect.ImmutableMap;
 import io.atomix.protocols.raft.cluster.MemberId;
 import io.atomix.protocols.raft.service.RaftService;
-import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.Partition;
 import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
 import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
@@ -53,8 +54,11 @@
 public class StoragePartition implements Managed<StoragePartition> {
 
     private final AtomicBoolean isOpened = new AtomicBoolean(false);
-    private final ClusterCommunicationService clusterCommunicator;
-    private final File logFolder;
+    private final UnifiedClusterCommunicationService clusterCommunicator;
+    private final MembershipService clusterService;
+    private final Version version;
+    private final Version source;
+    private final File dataFolder;
     private Partition partition;
     private NodeId localNodeId;
     private StoragePartitionServer server;
@@ -77,14 +81,20 @@
                             () -> new AtomixDocumentTreeService(Ordering.INSERTION))
                     .build();
 
-    public StoragePartition(Partition partition,
-            ClusterCommunicationService clusterCommunicator,
-            ClusterService clusterService,
-            File logFolder) {
+    public StoragePartition(
+            Partition partition,
+            Version version,
+            Version source,
+            UnifiedClusterCommunicationService clusterCommunicator,
+            MembershipService clusterService,
+            File dataFolder) {
         this.partition = partition;
+        this.version = version;
+        this.source = source;
         this.clusterCommunicator = clusterCommunicator;
+        this.clusterService = clusterService;
         this.localNodeId = clusterService.getLocalNode().id();
-        this.logFolder = logFolder;
+        this.dataFolder = dataFolder;
     }
 
     /**
@@ -97,7 +107,12 @@
 
     @Override
     public CompletableFuture<Void> open() {
-        if (partition.getMembers().contains(localNodeId)) {
+        if (source != null) {
+            return forkServer(source)
+                    .thenCompose(v -> openClient())
+                    .thenAccept(v -> isOpened.set(true))
+                    .thenApply(v -> null);
+        } else if (partition.getMembers().contains(localNodeId)) {
             return openServer()
                     .thenCompose(v -> openClient())
                     .thenAccept(v -> isOpened.set(true))
@@ -116,6 +131,43 @@
     }
 
     /**
+     * Returns the partition name.
+     *
+     * @return the partition name
+     */
+    public String getName() {
+        return getName(version);
+    }
+
+    /**
+     * Returns the partition name for the given version.
+     *
+     * @param version the version for which to return the partition name
+     * @return the partition name for the given version
+     */
+    String getName(Version version) {
+        return version != null ? String.format("partition-%d-%s", partition.getId().id(), version) : "partition-core";
+    }
+
+    /**
+     * Returns the partition version.
+     *
+     * @return the partition version
+     */
+    public Version getVersion() {
+        return version;
+    }
+
+    /**
+     * Returns the partition data folder.
+     *
+     * @return the partition data folder
+     */
+    public File getDataFolder() {
+        return dataFolder;
+    }
+
+    /**
      * Returns the identifier of the {@link Partition partition} associated with this instance.
      * @return partition identifier
      */
@@ -136,7 +188,23 @@
      * @return partition member identifiers
      */
     public Collection<MemberId> getMemberIds() {
-        return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+        return source != null ?
+                clusterService.getNodes()
+                        .stream()
+                        .map(node -> MemberId.from(node.id().id()))
+                        .collect(Collectors.toList()) :
+                Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+    }
+
+    Collection<MemberId> getMemberIds(Version version) {
+        if (source == null || version.equals(source)) {
+            return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+        } else {
+            return clusterService.getNodes()
+                    .stream()
+                    .map(node -> MemberId.from(node.id().id()))
+                    .collect(Collectors.toList());
+        }
     }
 
     /**
@@ -144,20 +212,37 @@
      * @return future that is completed after the operation is complete
      */
     private CompletableFuture<Void> openServer() {
-        if (!partition.getMembers().contains(localNodeId) || server != null) {
-            return CompletableFuture.completedFuture(null);
-        }
-        StoragePartitionServer server = new StoragePartitionServer(this,
+        StoragePartitionServer server = new StoragePartitionServer(
+                this,
                 MemberId.from(localNodeId.id()),
-                () -> new RaftServerCommunicator(
-                        partition.getId(),
-                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
-                        clusterCommunicator),
-                logFolder);
+                clusterCommunicator);
         return server.open().thenRun(() -> this.server = server);
     }
 
     /**
+     * Forks the server from the given version.
+     *
+     * @return future to be completed once the server has been forked
+     */
+    private CompletableFuture<Void> forkServer(Version version) {
+        StoragePartitionServer server = new StoragePartitionServer(
+                this,
+                MemberId.from(localNodeId.id()),
+                clusterCommunicator);
+
+        CompletableFuture<Void> future;
+        if (clusterService.getNodes().size() == 1) {
+            future = server.fork(version);
+        } else {
+            future = server.join(clusterService.getNodes().stream()
+                    .filter(node -> !node.id().equals(localNodeId))
+                    .map(node -> MemberId.from(node.id().id()))
+                    .collect(Collectors.toList()));
+        }
+        return future.thenRun(() -> this.server = server);
+    }
+
+    /**
      * Attempts to join the partition as a new member.
      * @return future that is completed after the operation is complete
      */
@@ -168,11 +253,7 @@
                  .collect(Collectors.toSet());
         StoragePartitionServer server = new StoragePartitionServer(this,
                 MemberId.from(localNodeId.id()),
-                () -> new RaftServerCommunicator(
-                        partition.getId(),
-                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
-                        clusterCommunicator),
-                logFolder);
+                clusterCommunicator);
         return server.join(Collections2.transform(otherMembers, n -> MemberId.from(n.id())))
                 .thenRun(() -> this.server = server);
     }
@@ -181,7 +262,7 @@
         client = new StoragePartitionClient(this,
                 MemberId.from(localNodeId.id()),
                 new RaftClientCommunicator(
-                        partition.getId(),
+                        getName(),
                         Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
                         clusterCommunicator));
         return client.open().thenApply(v -> client);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 8f1ffa3..96e5140 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -92,7 +92,6 @@
                 log.info("Failed to start client for partition {}", partition.getId(), e);
             }
         }).thenApply(v -> null);
-
     }
 
     @Override
@@ -316,7 +315,7 @@
     private RaftClient newRaftClient(RaftClientProtocol protocol) {
         return RaftClient.newBuilder()
                 .withClientId("partition-" + partition.getId())
-                .withMemberId(MemberId.from(localMemberId.id()))
+                .withMemberId(localMemberId)
                 .withProtocol(protocol)
                 .build();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
index 04c0bdf..1bed89d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
@@ -99,7 +99,7 @@
     public PartitionInfo toPartitionInfo() {
         Function<RaftMember, String> memberToString =
                 m -> m == null ? "none" : m.memberId().toString();
-        return new PartitionInfo(partitionId.toString(),
+        return new PartitionInfo(partitionId,
                 leaderTerm,
                 activeMembers.stream().map(memberToString).collect(Collectors.toList()),
                 memberToString.apply(leader));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index bb071a2..123c3b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -16,16 +16,19 @@
 package org.onosproject.store.primitives.impl;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
 
 import io.atomix.protocols.raft.RaftServer;
 import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.cluster.RaftMember;
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.storage.StorageLevel;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
 import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
 import org.onosproject.store.service.PartitionInfo;
 import org.onosproject.store.service.Serializer;
@@ -46,23 +49,21 @@
 
     private final MemberId localMemberId;
     private final StoragePartition partition;
-    private final Supplier<RaftServerProtocol> protocol;
-    private final File dataFolder;
+    private final UnifiedClusterCommunicationService clusterCommunicator;
     private RaftServer server;
 
     public StoragePartitionServer(
             StoragePartition partition,
             MemberId localMemberId,
-            Supplier<RaftServerProtocol> protocol,
-            File dataFolder) {
+            UnifiedClusterCommunicationService clusterCommunicator) {
         this.partition = partition;
         this.localMemberId = localMemberId;
-        this.protocol = protocol;
-        this.dataFolder = dataFolder;
+        this.clusterCommunicator = clusterCommunicator;
     }
 
     @Override
     public CompletableFuture<Void> open() {
+        log.info("Starting server for partition {} ({})", partition.getId(), partition.getVersion());
         CompletableFuture<RaftServer> serverOpenFuture;
         if (partition.getMemberIds().contains(localMemberId)) {
             if (server != null && server.isRunning()) {
@@ -77,9 +78,11 @@
         }
         return serverOpenFuture.whenComplete((r, e) -> {
             if (e == null) {
-                log.info("Successfully started server for partition {}", partition.getId());
+                log.info("Successfully started server for partition {} ({})",
+                        partition.getId(), partition.getVersion());
             } else {
-                log.info("Failed to start server for partition {}", partition.getId(), e);
+                log.info("Failed to start server for partition {} ({})",
+                        partition.getId(), partition.getVersion(), e);
             }
         }).thenApply(v -> null);
     }
@@ -97,16 +100,68 @@
         return server.leave();
     }
 
-    private RaftServer buildServer() {
+    /**
+     * Forks the existing partition into a new partition.
+     *
+     * @param version the version from which to fork the server
+     * @return future to be completed once the fork operation is complete
+     */
+    public CompletableFuture<Void> fork(Version version) {
+        log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
         RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
-                .withName("partition-" + partition.getId())
-                .withProtocol(protocol.get())
+                .withName(partition.getName(version))
+                .withType(RaftMember.Type.PASSIVE)
+                .withProtocol(new RaftServerCommunicator(
+                        partition.getName(version),
+                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+                        clusterCommunicator))
                 .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
                 .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
                 .withStorage(RaftStorage.newBuilder()
                         .withStorageLevel(StorageLevel.MAPPED)
                         .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
-                        .withDirectory(dataFolder)
+                        .withDirectory(partition.getDataFolder())
+                        .withMaxSegmentSize(MAX_SEGMENT_SIZE)
+                        .build());
+        StoragePartition.RAFT_SERVICES.forEach(builder::addService);
+        RaftServer server = builder.build();
+        return server.join(partition.getMemberIds(version))
+                .thenCompose(v -> server.shutdown())
+                .thenCompose(v -> {
+                    // Delete the cluster configuration file from the forked partition.
+                    try {
+                        Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+                    } catch (IOException e) {
+                        log.error("Failed to delete partition configuration: {}", e);
+                    }
+
+                    // Build and bootstrap a new server.
+                    this.server = buildServer();
+                    return this.server.bootstrap();
+                }).whenComplete((r, e) -> {
+                    if (e == null) {
+                        log.info("Successfully forked server for partition {} ({}->{})",
+                                partition.getId(), version, partition.getVersion());
+                    } else {
+                        log.info("Failed to fork server for partition {} ({}->{})",
+                                partition.getId(), version, partition.getVersion(), e);
+                    }
+                }).thenApply(v -> null);
+    }
+
+    private RaftServer buildServer() {
+        RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
+                .withName(partition.getName())
+                .withProtocol(new RaftServerCommunicator(
+                        partition.getName(),
+                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+                        clusterCommunicator))
+                .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
+                .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
+                .withStorage(RaftStorage.newBuilder()
+                        .withStorageLevel(StorageLevel.MAPPED)
+                        .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
+                        .withDirectory(partition.getDataFolder())
                         .withMaxSegmentSize(MAX_SEGMENT_SIZE)
                         .build());
         StoragePartition.RAFT_SERVICES.forEach(builder::addService);
@@ -114,12 +169,13 @@
     }
 
     public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
+        log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
         server = buildServer();
         return server.join(otherMembers).whenComplete((r, e) -> {
             if (e == null) {
-                log.info("Successfully joined partition {}", partition.getId());
+                log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
             } else {
-                log.info("Failed to join partition {}", partition.getId(), e);
+                log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
             }
         }).thenApply(v -> null);
     }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index 6ed8ec3..be24d00 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -98,7 +98,6 @@
 import org.junit.Before;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.primitives.impl.RaftClientCommunicator;
 import org.onosproject.store.primitives.impl.RaftServerCommunicator;
 import org.onosproject.store.service.Serializer;
@@ -382,7 +381,7 @@
         RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
                 .withType(member.getType())
                 .withProtocol(new RaftServerCommunicator(
-                        PartitionId.from(1),
+                        "partition-1",
                         PROTOCOL_SERIALIZER,
                         communicationServiceFactory.newCommunicationService(NodeId.nodeId(member.memberId().id()))))
                 .withStorage(RaftStorage.newBuilder()
@@ -406,7 +405,7 @@
         RaftClient client = RaftClient.newBuilder()
                 .withMemberId(memberId)
                 .withProtocol(new RaftClientCommunicator(
-                        PartitionId.from(1),
+                        "partition-1",
                         PROTOCOL_SERIALIZER,
                         communicationServiceFactory.newCommunicationService(NodeId.nodeId(memberId.id()))))
                 .build();
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 2de7dcc..d5deae1 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -250,6 +250,7 @@
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.WorkQueueStats;
 import org.onosproject.ui.model.topo.UiTopoLayoutId;
+import org.onosproject.upgrade.Upgrade;
 
 import java.net.URI;
 import java.time.Duration;
@@ -627,6 +628,8 @@
                     PiCriterion.class,
                     PiInstruction.class
             )
+            .register(Upgrade.class)
+            .register(Upgrade.Status.class)
             .build("API");
 
     /**
diff --git a/tools/dev/bash_profile b/tools/dev/bash_profile
index 3a3c3c6..991cfaf 100644
--- a/tools/dev/bash_profile
+++ b/tools/dev/bash_profile
@@ -174,6 +174,25 @@
     echo $OCI
 }
 
+# Sets minority (OCMI) and majority (OCMA) variables
+function setMinorityMajority {
+    nodes=($(env | grep 'OC[0-9]*=' | sort | cut -d= -f2))
+    middle=$(expr "${#nodes[@]}" / "2")
+    index=0
+    min=1
+    maj=1
+    for node in "${nodes[@]}"; do
+        if [ "$index" -gt "$middle" ]; then
+            export OCMI${min}=${node}
+            min=$(expr $min + 1)
+        else
+            export OCMA${maj}=${node}
+            maj=$(expr $maj + 1)
+        fi
+        index=$(expr $index + 1)
+    done
+}
+
 # ON.Lab shared test cell warden address
 export CELL_WARDEN="10.254.1.19"
 export CELL_SLAVES="$CELL_WARDEN 10.254.1.18 10.254.1.17"
@@ -184,6 +203,8 @@
     unset OCI OCN OCT ONOS_INSTANCES ONOS_FEATURES
     unset ONOS_USER ONOS_GROUP ONOS_WEB_USER ONOS_WEB_PASS
     unset $(env | sed -n 's:\(^OC[0-9]\{1,\}\)=.*:\1 :g p')
+    unset $(env | sed -n 's:\(^OCMI[0-9]\{1,\}\)=.*:\1 :g p')
+    unset $(env | sed -n 's:\(^OCMA[0-9]\{1,\}\)=.*:\1 :g p')
 }
 
 # Applies the settings in the specified cell file or lists current cell definition
@@ -204,6 +225,7 @@
         . $aux
         rm -f $aux
         setPrimaryInstance 1 >/dev/null
+        setMinorityMajority >/dev/null
         onos-verify-cell
         topo default >/dev/null
         ;;
diff --git a/tools/test/bin/onos-install b/tools/test/bin/onos-install
index cc00da0..65b7a42 100755
--- a/tools/test/bin/onos-install
+++ b/tools/test/bin/onos-install
@@ -36,7 +36,7 @@
 
 onos-check-bits
 
-while getopts fnm: o; do
+while getopts fnvm: o; do
     case "$o" in
         f) uninstall=true;;
         u) noupstart=true; noinitd=true; nosysd=true;;
@@ -44,6 +44,7 @@
         s) nosysd=true;;
         n) nostart=true;;
         m) mvn_settings=$OPTARG;;
+        v) upgrade=true;;
     esac
 done
 let OPC=$OPTIND-1
@@ -60,7 +61,7 @@
 [ ! -z "$mvn_settings" ] && scp -q $mvn_settings $remote:/tmp/settings.xml
 
 ssh -tt $remote "
-    [ -d $ONOS_INSTALL_DIR/bin ] && echo \"ONOS is already installed\" && exit 1
+    [ -z "$upgrade" ] && [ -d $ONOS_INSTALL_DIR/bin ] && echo \"ONOS is already installed\" && exit 1
 
     # Prepare a landing zone and unroll the bits
     sudo mkdir -p $ONOS_INSTALL_DIR && sudo chown ${ONOS_USER}:${ONOS_GROUP} $ONOS_INSTALL_DIR
@@ -81,6 +82,12 @@
     # Set up correct user to run onos-service
     echo 'export ONOS_USER=$ONOS_USER' >> $ONOS_INSTALL_DIR/options
 
+    # If the upgrade flag is set, append ".upgrade" to the version string.
+    if [ ! -z "$upgrade" ]
+    then
+        echo '.upgrade' >> $ONOS_INSTALL_DIR/VERSION
+    fi
+
     # Remove any previous ON.Lab bits from ~/.m2 repo.
     rm -fr ~/.m2/repository/org/onosproject
 
diff --git a/tools/test/scenarios/upgrade-rollback.xml b/tools/test/scenarios/upgrade-rollback.xml
new file mode 100644
index 0000000..4e58829
--- /dev/null
+++ b/tools/test/scenarios/upgrade-rollback.xml
@@ -0,0 +1,125 @@
+<!--
+  ~ Copyright 2017-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<scenario name="upgrade-rollback" description="ONOS cluster upgrade and rollback">
+    <group name="Upgrade-Rollback">
+        <step name="Push-Bits" exec="onos-push-bits-through-proxy" if="${OCT}"/>
+
+        <step name="Initialize-Upgrade"
+              exec="onos ${OC1} issu init"/>
+
+        <group name="Phase-1">
+            <sequential var="${OCMI#}"
+                        starts="Phase-One-Stop-Service-${#}"
+                        ends="Phase-One-Wait-for-Start-${#-1}">
+                <step name="Phase-One-Stop-Service-${#}"
+                      exec="onos-service ${OCMI#} stop"
+                      requires="Initialize-Upgrade"/>
+
+                <step name="Phase-One-Wait-for-Stop-${#}"
+                      exec="onos-wait-for-stop ${OCMI#}"
+                      requires="~Phase-One-Stop-Service-${#}"/>
+
+                <step name="Phase-One-Uninstall-${#}"
+                      exec="onos-uninstall ${OCMI#}"
+                      requires="~Phase-One-Wait-for-Stop-${#}"/>
+
+                <step name="Phase-One-Push-Bits-${#}"
+                      exec="onos-push-bits ${OCMI#}"
+                      unless="${OCT}"
+                      requires="~Phase-One-Stop-Service-${#}"/>
+
+                <step name="Phase-One-Install-Upgrade-${#}"
+                      exec="onos-install -v ${OCMI#}"
+                      requires="Phase-One-Push-Bits-${#},Push-Bits,Phase-One-Uninstall-${#}"/>
+
+                <step name="Phase-One-Secure-SSH-${#}"
+                      exec="onos-secure-ssh -u ${ONOS_WEB_USER} -p ${ONOS_WEB_PASS} ${OCMI#}"
+                      requires="~Phase-One-Install-Upgrade-${#}"/>
+
+                <step name="Phase-One-Wait-for-Start-${#}"
+                      exec="onos-wait-for-start ${OCMI#}"
+                      requires="Phase-One-Secure-SSH-${#}"/>
+            </sequential>
+        </group>
+
+        <step name="Run-Upgrade"
+              exec="onos ${OC1} issu upgrade"
+              requires="Phase-1"/>
+
+        <step name="Run-Rollback"
+              exec="onos ${OC1} issu rollback"
+              requires="Run-Upgrade"/>
+
+        <group name="Phase-2" requires="Run-Rollback">
+            <sequential var="${OCMI#}"
+                        starts="Phase-Two-Stop-Service-${#}"
+                        ends="Phase-Two-Wait-for-Start-${#-1}">
+                <step name="Phase-Two-Stop-Service-${#}"
+                      exec="onos-service ${OCMI#} stop"
+                      requires="Run-Rollback"/>
+
+                <step name="Phase-Two-Wait-for-Stop-${#}"
+                      exec="onos-wait-for-stop ${OCMI#}"
+                      requires="~Phase-Two-Stop-Service-${#}"/>
+
+                <step name="Phase-Two-Uninstall-${#}"
+                      exec="onos-uninstall ${OCMI#}"
+                      requires="~Phase-Two-Wait-for-Stop-${#}"/>
+
+                <step name="Phase-Two-Push-Bits-${#}"
+                      exec="onos-push-bits ${OCMI#}"
+                      unless="${OCT}"
+                      requires="~Phase-Two-Stop-Service-${#}"/>
+
+                <step name="Phase-Two-Install-Upgrade-${#}"
+                      exec="onos-install ${OCMI#}"
+                      requires="Phase-Two-Push-Bits-${#},Push-Bits,Phase-Two-Uninstall-${#}"/>
+
+                <step name="Phase-Two-Secure-SSH-${#}"
+                      exec="onos-secure-ssh -u ${ONOS_WEB_USER} -p ${ONOS_WEB_PASS} ${OCMI#}"
+                      requires="~Phase-Two-Install-Upgrade-${#}"/>
+
+                <step name="Phase-Two-Wait-for-Start-${#}"
+                      exec="onos-wait-for-start ${OCMI#}"
+                      requires="Phase-Two-Secure-SSH-${#}"/>
+            </sequential>
+        </group>
+
+        <step name="Reset-Upgrade"
+              exec="onos ${OC1} issu reset"
+              requires="Phase-2"/>
+
+        <group name="Verify-Rollback" requires="Reset-Upgrade">
+            <parallel var="${OC#}">
+                <step name="Check-Nodes-${#}"
+                      exec="onos-check-nodes ${OC#}"
+                      delay="3"
+                      requires="Reset-Upgrade"/>
+                <step name="Check-Components-${#}"
+                      exec="onos-check-components ${OC#}"
+                      delay="5"
+                      requires="~Check-Nodes-${#}"/>
+
+                <step name="Check-Logs-${#}"
+                      exec="onos-check-logs ${OC#}"
+                      requires="~Check-Components-${#}"/>
+                <step name="Check-Apps-${#}"
+                      exec="onos-check-apps ${OC#} ${ONOS_APPS} includes"
+                      requires="~Check-Components-${#}"/>
+            </parallel>
+        </group>
+    </group>
+</scenario>
\ No newline at end of file
diff --git a/tools/test/scenarios/upgrade.xml b/tools/test/scenarios/upgrade.xml
new file mode 100644
index 0000000..e675632
--- /dev/null
+++ b/tools/test/scenarios/upgrade.xml
@@ -0,0 +1,121 @@
+<!--
+  ~ Copyright 2017-present Open Networking Foundation
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<scenario name="upgrade" description="ONOS cluster upgrade">
+    <group name="Upgrade">
+        <step name="Push-Bits" exec="onos-push-bits-through-proxy" if="${OCT}"/>
+
+        <step name="Initialize-Upgrade"
+              exec="onos ${OC1} issu init"/>
+
+        <group name="Phase-1">
+            <sequential var="${OCMI#}"
+                        starts="Phase-One-Stop-Service-${#}"
+                        ends="Phase-One-Wait-for-Start-${#-1}">
+                <step name="Phase-One-Stop-Service-${#}"
+                      exec="onos-service ${OCMI#} stop"
+                      requires="Initialize-Upgrade"/>
+
+                <step name="Phase-One-Wait-for-Stop-${#}"
+                      exec="onos-wait-for-stop ${OCMI#}"
+                      requires="~Phase-One-Stop-Service-${#}"/>
+
+                <step name="Phase-One-Uninstall-${#}"
+                      exec="onos-uninstall ${OCMI#}"
+                      requires="~Phase-One-Wait-for-Stop-${#}"/>
+
+                <step name="Phase-One-Push-Bits-${#}"
+                      exec="onos-push-bits ${OCMI#}"
+                      unless="${OCT}"
+                      requires="~Phase-One-Stop-Service-${#}"/>
+
+                <step name="Phase-One-Install-Upgrade-${#}"
+                      exec="onos-install -v ${OCMI#}"
+                      requires="Phase-One-Push-Bits-${#},Push-Bits,Phase-One-Uninstall-${#}"/>
+
+                <step name="Phase-One-Secure-SSH-${#}"
+                      exec="onos-secure-ssh -u ${ONOS_WEB_USER} -p ${ONOS_WEB_PASS} ${OCMI#}"
+                      requires="~Phase-One-Install-Upgrade-${#}"/>
+
+                <step name="Phase-One-Wait-for-Start-${#}"
+                      exec="onos-wait-for-start ${OCMI#}"
+                      requires="Phase-One-Secure-SSH-${#}"/>
+            </sequential>
+        </group>
+
+        <step name="Run-Upgrade"
+              exec="onos ${OC1} issu upgrade"
+              requires="Phase-1"/>
+
+        <group name="Phase-2" requires="Run-Upgrade">
+            <sequential var="${OCMA#}"
+                        starts="Phase-Two-Stop-Service-${#}"
+                        ends="Phase-Two-Wait-for-Start-${#-1}">
+                <step name="Phase-Two-Stop-Service-${#}"
+                      exec="onos-service ${OCMA#} stop"
+                      requires="Run-Upgrade"/>
+
+                <step name="Phase-Two-Wait-for-Stop-${#}"
+                      exec="onos-wait-for-stop ${OCMA#}"
+                      requires="~Phase-Two-Stop-Service-${#}"/>
+
+                <step name="Phase-Two-Uninstall-${#}"
+                      exec="onos-uninstall ${OCMA#}"
+                      requires="~Phase-Two-Wait-for-Stop-${#}"/>
+
+                <step name="Phase-Two-Push-Bits-${#}"
+                      exec="onos-push-bits ${OCMA#}"
+                      unless="${OCT}"
+                      requires="~Phase-Two-Stop-Service-${#}"/>
+
+                <step name="Phase-Two-Install-Upgrade-${#}"
+                      exec="onos-install -v ${OCMA#}"
+                      requires="Phase-Two-Push-Bits-${#},Push-Bits,Phase-Two-Uninstall-${#}"/>
+
+                <step name="Phase-Two-Secure-SSH-${#}"
+                      exec="onos-secure-ssh -u ${ONOS_WEB_USER} -p ${ONOS_WEB_PASS} ${OCMA#}"
+                      requires="~Phase-Two-Install-Upgrade-${#}"/>
+
+                <step name="Phase-Two-Wait-for-Start-${#}"
+                      exec="onos-wait-for-start ${OCMA#}"
+                      requires="Phase-Two-Secure-SSH-${#}"/>
+            </sequential>
+        </group>
+
+        <step name="Commit-Upgrade"
+              exec="onos ${OC1} issu commit"
+              requires="Phase-2"/>
+
+        <group name="Verify-Upgrade" requires="Commit-Upgrade">
+            <parallel var="${OC#}">
+                <step name="Check-Nodes-${#}"
+                      exec="onos-check-nodes ${OC#}"
+                      delay="3"
+                      requires="Commit-Upgrade"/>
+                <step name="Check-Components-${#}"
+                      exec="onos-check-components ${OC#}"
+                      delay="5"
+                      requires="~Check-Nodes-${#}"/>
+
+                <step name="Check-Logs-${#}"
+                      exec="onos-check-logs ${OC#}"
+                      requires="~Check-Components-${#}"/>
+                <step name="Check-Apps-${#}"
+                      exec="onos-check-apps ${OC#} ${ONOS_APPS} includes"
+                      requires="~Check-Components-${#}"/>
+            </parallel>
+        </group>
+    </group>
+</scenario>
\ No newline at end of file
diff --git a/web/gui/src/main/java/org/onosproject/ui/impl/PartitionViewMessageHandler.java b/web/gui/src/main/java/org/onosproject/ui/impl/PartitionViewMessageHandler.java
index a0769da..7537871 100644
--- a/web/gui/src/main/java/org/onosproject/ui/impl/PartitionViewMessageHandler.java
+++ b/web/gui/src/main/java/org/onosproject/ui/impl/PartitionViewMessageHandler.java
@@ -88,7 +88,7 @@
         }
 
         private void populateRow(TableModel.Row row, PartitionInfo p) {
-            row.cell(NAME, p.name())
+            row.cell(NAME, p.id())
                     .cell(TERM, p.term())
                     .cell(LEADER, p.leader())
                     .cell(MEMBERS, p.members());