[ONOS-7054] Implement prototype of ISSU protocol

Change-Id: Id543c0de9c97b68f977c824cbc987b35d81beb2d
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
new file mode 100644
index 0000000..29045a2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.AtomicCounterBuilder;
+import org.onosproject.store.service.AtomicCounterMapBuilder;
+import org.onosproject.store.service.AtomicIdGeneratorBuilder;
+import org.onosproject.store.service.AtomicValueBuilder;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.ConsistentMultimapBuilder;
+import org.onosproject.store.service.ConsistentTreeMapBuilder;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.DocumentTreeBuilder;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+import org.onosproject.store.service.LeaderElectorBuilder;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Topic;
+import org.onosproject.store.service.TransactionContextBuilder;
+import org.onosproject.store.service.WorkQueue;
+import org.slf4j.Logger;
+
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of {@code CoordinationService} that uses a {@link StoragePartition} that spans all the nodes
+ * in the cluster regardless of version.
+ */
+@Service
+@Component(immediate = true)
+public class CoordinationManager implements CoordinationService {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UnifiedClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UnifiedClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PersistenceService persistenceService;
+
+    private StoragePartition partition;
+    private DistributedPrimitiveCreator primitiveCreator;
+
+    @Activate
+    public void activate() {
+        partition = new StoragePartition(
+                new DefaultPartition(
+                        PartitionId.SHARED,
+                        clusterService.getNodes()
+                                .stream()
+                                .map(ControllerNode::id)
+                                .collect(Collectors.toSet())),
+                null,
+                null,
+                clusterCommunicator,
+                clusterService,
+                new File(System.getProperty("karaf.data") + "/partitions/coordination"));
+        partition.open().join();
+        primitiveCreator = partition.client();
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+                clusterCommunicator,
+                persistenceService);
+    }
+
+    @Override
+    public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultConsistentMapBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultDocumentTreeBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
+        return new DefaultConsistentTreeMapBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultConsistentMultimapBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultAtomicCounterMapBuilder<>(primitiveCreator);
+    }
+
+    @Override
+    public <E> DistributedSetBuilder<E> setBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
+    }
+
+    @Override
+    public AtomicCounterBuilder atomicCounterBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultAtomicCounterBuilder(primitiveCreator);
+    }
+
+    @Override
+    public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultAtomicIdGeneratorBuilder(primitiveCreator);
+    }
+
+    @Override
+    public <V> AtomicValueBuilder<V> atomicValueBuilder() {
+        checkPermission(STORAGE_WRITE);
+        Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
+                () -> this.<String, byte[]>consistentMapBuilder()
+                          .withName("onos-atomic-values")
+                          .withSerializer(Serializer.using(KryoNamespaces.BASIC));
+        return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
+    }
+
+    @Override
+    public TransactionContextBuilder transactionContextBuilder() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LeaderElectorBuilder leaderElectorBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new DefaultLeaderElectorBuilder(primitiveCreator);
+    }
+
+    @Override
+    public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return primitiveCreator.newWorkQueue(name, serializer);
+    }
+
+    @Override
+    public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return primitiveCreator.newAsyncDocumentTree(name, serializer);
+    }
+
+    @Override
+    public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
+            String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return primitiveCreator.newAsyncConsistentSetMultimap(name,
+                                                                serializer);
+    }
+
+    @Override
+    public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
+            String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return primitiveCreator.newAsyncConsistentTreeMap(name, serializer);
+    }
+
+    @Override
+    public <T> Topic<T> getTopic(String name, Serializer serializer) {
+        AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
+                                              .withName("topic-" + name)
+                                              .withSerializer(serializer)
+                                              .build();
+        return new DefaultDistributedTopic<>(atomicValue);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index d15451e..1bfaaed 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -16,11 +16,11 @@
 package org.onosproject.store.primitives.impl;
 
 import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 
@@ -38,8 +38,8 @@
  */
 public class EventuallyConsistentMapBuilderImpl<K, V>
         implements EventuallyConsistentMapBuilder<K, V> {
-    private final ClusterService clusterService;
-    private final ClusterCommunicationService clusterCommunicator;
+    private final MembershipService clusterService;
+    private final ClusterCommunicator clusterCommunicator;
 
     private String name;
     private KryoNamespace serializer;
@@ -64,8 +64,8 @@
      * @param clusterCommunicator cluster communication service
      * @param persistenceService persistence service
      */
-    public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
-                                              ClusterCommunicationService clusterCommunicator,
+    public EventuallyConsistentMapBuilderImpl(MembershipService clusterService,
+                                              ClusterCommunicator clusterCommunicator,
                                               PersistenceService persistenceService) {
         this.persistenceService = persistenceService;
         this.clusterService = checkNotNull(clusterService);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index bed19d5..461245e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -25,13 +25,13 @@
 import org.onlab.util.AbstractAccumulator;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.LogicalTimestamp;
 import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.DistributedPrimitive;
@@ -86,8 +86,8 @@
 
     private final Map<K, MapValue<V>> items;
 
-    private final ClusterService clusterService;
-    private final ClusterCommunicationService clusterCommunicator;
+    private final MembershipService clusterService;
+    private final ClusterCommunicator clusterCommunicator;
     private final Serializer serializer;
     private final NodeId localNodeId;
     private final PersistenceService persistenceService;
@@ -162,8 +162,8 @@
      * @param persistenceService    persistence service
      */
     EventuallyConsistentMapImpl(String mapName,
-                                ClusterService clusterService,
-                                ClusterCommunicationService clusterCommunicator,
+                                MembershipService clusterService,
+                                ClusterCommunicator clusterCommunicator,
                                 KryoNamespace ns,
                                 BiFunction<K, V, Timestamp> timestampProvider,
                                 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 500b75c..4a92682 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -42,8 +42,10 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.PartitionDiff;
 import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.PartitionAdminService;
 import org.onosproject.store.primitives.PartitionEvent;
@@ -51,6 +53,7 @@
 import org.onosproject.store.primitives.PartitionService;
 import org.onosproject.store.service.PartitionClientInfo;
 import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.upgrade.UpgradeService;
 import org.slf4j.Logger;
 
 import static org.onosproject.security.AppGuard.checkPermission;
@@ -68,7 +71,7 @@
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
+    protected UnifiedClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterMetadataService metadataService;
@@ -76,7 +79,14 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected UpgradeService upgradeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected VersionService versionService;
+
+    private final Map<PartitionId, StoragePartition> inactivePartitions = Maps.newConcurrentMap();
+    private final Map<PartitionId, StoragePartition> activePartitions = Maps.newConcurrentMap();
     private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
     private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
 
@@ -85,17 +95,58 @@
         eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
         currentClusterMetadata.set(metadataService.getClusterMetadata());
         metadataService.addListener(metadataListener);
-        currentClusterMetadata.get()
-                       .getPartitions()
-                       .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
-                               clusterCommunicator,
-                               clusterService,
-                               new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
 
-        CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
-                                                                               .stream()
-                                                                               .map(StoragePartition::open)
-                                                                               .toArray(CompletableFuture[]::new));
+        // If an upgrade is currently in progress and this node is an upgraded node, initialize upgrade partitions.
+        CompletableFuture<Void> openFuture;
+        if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
+            Version sourceVersion = upgradeService.getState().source();
+            Version targetVersion = upgradeService.getState().target();
+            currentClusterMetadata.get()
+                    .getPartitions()
+                    .forEach(partition -> inactivePartitions.put(partition.getId(), new StoragePartition(
+                            partition,
+                            sourceVersion,
+                            null,
+                            clusterCommunicator,
+                            clusterService,
+                            new File(System.getProperty("karaf.data") +
+                                    "/partitions/" + sourceVersion + "/" + partition.getId()))));
+            currentClusterMetadata.get()
+                    .getPartitions()
+                    .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+                            partition,
+                            targetVersion,
+                            sourceVersion,
+                            clusterCommunicator,
+                            clusterService,
+                            new File(System.getProperty("karaf.data") +
+                                    "/partitions/" + targetVersion + "/" + partition.getId()))));
+
+            // We have to fork existing partitions before we can start inactive partition servers to
+            // avoid duplicate message handlers when both servers are running.
+            openFuture = CompletableFuture.allOf(activePartitions.values().stream()
+                    .map(StoragePartition::open)
+                    .toArray(CompletableFuture[]::new))
+                    .thenCompose(v -> CompletableFuture.allOf(inactivePartitions.values().stream()
+                            .map(StoragePartition::open)
+                            .toArray(CompletableFuture[]::new)));
+        } else {
+            Version version = versionService.version();
+            currentClusterMetadata.get()
+                    .getPartitions()
+                    .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+                            partition,
+                            version,
+                            null,
+                            clusterCommunicator,
+                            clusterService,
+                            new File(System.getProperty("karaf.data") +
+                                    "/partitions/" + version + "/" + partition.getId()))));
+            openFuture = CompletableFuture.allOf(activePartitions.values().stream()
+                    .map(StoragePartition::open)
+                    .toArray(CompletableFuture[]::new));
+        }
+
         openFuture.join();
         log.info("Started");
     }
@@ -105,10 +156,13 @@
         metadataService.removeListener(metadataListener);
         eventDispatcher.removeSink(PartitionEvent.class);
 
-        CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
-                                                                                .stream()
-                                                                                .map(StoragePartition::close)
-                                                                                .toArray(CompletableFuture[]::new));
+        CompletableFuture<Void> closeFuture = CompletableFuture.allOf(
+                CompletableFuture.allOf(inactivePartitions.values().stream()
+                        .map(StoragePartition::close)
+                        .toArray(CompletableFuture[]::new)),
+                CompletableFuture.allOf(activePartitions.values().stream()
+                        .map(StoragePartition::close)
+                        .toArray(CompletableFuture[]::new)));
         closeFuture.join();
         log.info("Stopped");
     }
@@ -116,25 +170,25 @@
     @Override
     public int getNumberOfPartitions() {
         checkPermission(PARTITION_READ);
-        return partitions.size();
+        return activePartitions.size();
     }
 
     @Override
     public Set<PartitionId> getAllPartitionIds() {
         checkPermission(PARTITION_READ);
-        return partitions.keySet();
+        return activePartitions.keySet();
     }
 
     @Override
     public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
         checkPermission(PARTITION_READ);
-        return partitions.get(partitionId).client();
+        return activePartitions.get(partitionId).client();
     }
 
     @Override
     public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
         checkPermission(PARTITION_READ);
-        StoragePartition partition = partitions.get(partitionId);
+        StoragePartition partition = activePartitions.get(partitionId);
         return ImmutableSet.copyOf(partition.getMembers());
     }
 
@@ -148,7 +202,7 @@
 
     @Override
     public List<PartitionInfo> partitionInfo() {
-        return partitions.values()
+        return activePartitions.values()
                          .stream()
                          .flatMap(x -> Tools.stream(x.info()))
                          .collect(Collectors.toList());
@@ -161,7 +215,7 @@
                     .values()
                     .stream()
                     .filter(PartitionDiff::hasChanged)
-                    .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
+                    .forEach(diff -> activePartitions.get(diff.partitionId()).onUpdate(diff.newValue()));
     }
 
     private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
@@ -173,7 +227,7 @@
 
     @Override
     public List<PartitionClientInfo> partitionClientInfo() {
-        return partitions.values()
+        return activePartitions.values()
                          .stream()
                          .map(StoragePartition::client)
                          .map(StoragePartitionClient::clientInfo)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 40e9fa0..8bae2a3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -40,8 +40,7 @@
 import io.atomix.protocols.raft.protocol.ResetRequest;
 import io.atomix.protocols.raft.session.SessionId;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.service.Serializer;
 
 /**
@@ -50,10 +49,10 @@
 public class RaftClientCommunicator extends RaftCommunicator implements RaftClientProtocol {
 
     public RaftClientCommunicator(
-            PartitionId partitionId,
+            String prefix,
             Serializer serializer,
-            ClusterCommunicationService clusterCommunicator) {
-        super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+            ClusterCommunicator clusterCommunicator) {
+        super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
index 1117ab9..765eb02 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
@@ -22,7 +22,7 @@
 import io.atomix.protocols.raft.RaftException;
 import io.atomix.protocols.raft.cluster.MemberId;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.cluster.messaging.MessagingException;
 import org.onosproject.store.service.Serializer;
@@ -35,12 +35,12 @@
 public abstract class RaftCommunicator {
     protected final RaftMessageContext context;
     protected final Serializer serializer;
-    protected final ClusterCommunicationService clusterCommunicator;
+    protected final ClusterCommunicator clusterCommunicator;
 
     public RaftCommunicator(
             RaftMessageContext context,
             Serializer serializer,
-            ClusterCommunicationService clusterCommunicator) {
+            ClusterCommunicator clusterCommunicator) {
         this.context = checkNotNull(context, "context cannot be null");
         this.serializer = checkNotNull(serializer, "serializer cannot be null");
         this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 9b8f3e6..2710a2c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -56,8 +56,8 @@
 import io.atomix.protocols.raft.protocol.VoteResponse;
 import io.atomix.protocols.raft.session.SessionId;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.service.Serializer;
 
 /**
@@ -66,10 +66,10 @@
 public class RaftServerCommunicator extends RaftCommunicator implements RaftServerProtocol {
 
     public RaftServerCommunicator(
-            PartitionId partitionId,
+            String prefix,
             Serializer serializer,
-            ClusterCommunicationService clusterCommunicator) {
-        super(new RaftMessageContext(String.format("partition-%d", partitionId.id())), serializer, clusterCommunicator);
+            ClusterCommunicator clusterCommunicator) {
+        super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 822641c..59c4b17 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,10 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
-import static org.slf4j.LoggerFactory.getLogger;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -26,25 +22,26 @@
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.PartitionId;
+import org.onosproject.cluster.UnifiedClusterService;
 import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.PartitionAdminService;
 import org.onosproject.store.primitives.PartitionService;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.AtomicCounterMapBuilder;
 import org.onosproject.store.service.AtomicIdGeneratorBuilder;
@@ -68,7 +65,9 @@
 import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
-import com.google.common.collect.Maps;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Implementation for {@code StorageService} and {@code StorageAdminService}.
@@ -82,10 +81,10 @@
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
+    protected UnifiedClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
+    protected UnifiedClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PersistenceService persistenceService;
@@ -105,7 +104,7 @@
     public void activate() {
         Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
         partitionService.getAllPartitionIds().stream()
-            .filter(id -> !id.equals(PartitionId.from(0)))
+            .filter(id -> !id.equals(PartitionId.SHARED))
             .forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
         federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap, BUCKETS);
         transactionManager = new TransactionManager(this, partitionService, BUCKETS);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 842e047..414c49c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -29,11 +29,12 @@
 import com.google.common.collect.ImmutableMap;
 import io.atomix.protocols.raft.cluster.MemberId;
 import io.atomix.protocols.raft.service.RaftService;
-import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.Partition;
 import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
 import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
@@ -53,8 +54,11 @@
 public class StoragePartition implements Managed<StoragePartition> {
 
     private final AtomicBoolean isOpened = new AtomicBoolean(false);
-    private final ClusterCommunicationService clusterCommunicator;
-    private final File logFolder;
+    private final UnifiedClusterCommunicationService clusterCommunicator;
+    private final MembershipService clusterService;
+    private final Version version;
+    private final Version source;
+    private final File dataFolder;
     private Partition partition;
     private NodeId localNodeId;
     private StoragePartitionServer server;
@@ -77,14 +81,20 @@
                             () -> new AtomixDocumentTreeService(Ordering.INSERTION))
                     .build();
 
-    public StoragePartition(Partition partition,
-            ClusterCommunicationService clusterCommunicator,
-            ClusterService clusterService,
-            File logFolder) {
+    public StoragePartition(
+            Partition partition,
+            Version version,
+            Version source,
+            UnifiedClusterCommunicationService clusterCommunicator,
+            MembershipService clusterService,
+            File dataFolder) {
         this.partition = partition;
+        this.version = version;
+        this.source = source;
         this.clusterCommunicator = clusterCommunicator;
+        this.clusterService = clusterService;
         this.localNodeId = clusterService.getLocalNode().id();
-        this.logFolder = logFolder;
+        this.dataFolder = dataFolder;
     }
 
     /**
@@ -97,7 +107,12 @@
 
     @Override
     public CompletableFuture<Void> open() {
-        if (partition.getMembers().contains(localNodeId)) {
+        if (source != null) {
+            return forkServer(source)
+                    .thenCompose(v -> openClient())
+                    .thenAccept(v -> isOpened.set(true))
+                    .thenApply(v -> null);
+        } else if (partition.getMembers().contains(localNodeId)) {
             return openServer()
                     .thenCompose(v -> openClient())
                     .thenAccept(v -> isOpened.set(true))
@@ -116,6 +131,43 @@
     }
 
     /**
+     * Returns the partition name.
+     *
+     * @return the partition name
+     */
+    public String getName() {
+        return getName(version);
+    }
+
+    /**
+     * Returns the partition name for the given version.
+     *
+     * @param version the version for which to return the partition name
+     * @return the partition name for the given version
+     */
+    String getName(Version version) {
+        return version != null ? String.format("partition-%d-%s", partition.getId().id(), version) : "partition-core";
+    }
+
+    /**
+     * Returns the partition version.
+     *
+     * @return the partition version
+     */
+    public Version getVersion() {
+        return version;
+    }
+
+    /**
+     * Returns the partition data folder.
+     *
+     * @return the partition data folder
+     */
+    public File getDataFolder() {
+        return dataFolder;
+    }
+
+    /**
      * Returns the identifier of the {@link Partition partition} associated with this instance.
      * @return partition identifier
      */
@@ -136,7 +188,23 @@
      * @return partition member identifiers
      */
     public Collection<MemberId> getMemberIds() {
-        return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+        return source != null ?
+                clusterService.getNodes()
+                        .stream()
+                        .map(node -> MemberId.from(node.id().id()))
+                        .collect(Collectors.toList()) :
+                Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+    }
+
+    Collection<MemberId> getMemberIds(Version version) {
+        if (source == null || version.equals(source)) {
+            return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
+        } else {
+            return clusterService.getNodes()
+                    .stream()
+                    .map(node -> MemberId.from(node.id().id()))
+                    .collect(Collectors.toList());
+        }
     }
 
     /**
@@ -144,20 +212,37 @@
      * @return future that is completed after the operation is complete
      */
     private CompletableFuture<Void> openServer() {
-        if (!partition.getMembers().contains(localNodeId) || server != null) {
-            return CompletableFuture.completedFuture(null);
-        }
-        StoragePartitionServer server = new StoragePartitionServer(this,
+        StoragePartitionServer server = new StoragePartitionServer(
+                this,
                 MemberId.from(localNodeId.id()),
-                () -> new RaftServerCommunicator(
-                        partition.getId(),
-                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
-                        clusterCommunicator),
-                logFolder);
+                clusterCommunicator);
         return server.open().thenRun(() -> this.server = server);
     }
 
     /**
+     * Forks the server from the given version.
+     *
+     * @return future to be completed once the server has been forked
+     */
+    private CompletableFuture<Void> forkServer(Version version) {
+        StoragePartitionServer server = new StoragePartitionServer(
+                this,
+                MemberId.from(localNodeId.id()),
+                clusterCommunicator);
+
+        CompletableFuture<Void> future;
+        if (clusterService.getNodes().size() == 1) {
+            future = server.fork(version);
+        } else {
+            future = server.join(clusterService.getNodes().stream()
+                    .filter(node -> !node.id().equals(localNodeId))
+                    .map(node -> MemberId.from(node.id().id()))
+                    .collect(Collectors.toList()));
+        }
+        return future.thenRun(() -> this.server = server);
+    }
+
+    /**
      * Attempts to join the partition as a new member.
      * @return future that is completed after the operation is complete
      */
@@ -168,11 +253,7 @@
                  .collect(Collectors.toSet());
         StoragePartitionServer server = new StoragePartitionServer(this,
                 MemberId.from(localNodeId.id()),
-                () -> new RaftServerCommunicator(
-                        partition.getId(),
-                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
-                        clusterCommunicator),
-                logFolder);
+                clusterCommunicator);
         return server.join(Collections2.transform(otherMembers, n -> MemberId.from(n.id())))
                 .thenRun(() -> this.server = server);
     }
@@ -181,7 +262,7 @@
         client = new StoragePartitionClient(this,
                 MemberId.from(localNodeId.id()),
                 new RaftClientCommunicator(
-                        partition.getId(),
+                        getName(),
                         Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
                         clusterCommunicator));
         return client.open().thenApply(v -> client);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 8f1ffa3..96e5140 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -92,7 +92,6 @@
                 log.info("Failed to start client for partition {}", partition.getId(), e);
             }
         }).thenApply(v -> null);
-
     }
 
     @Override
@@ -316,7 +315,7 @@
     private RaftClient newRaftClient(RaftClientProtocol protocol) {
         return RaftClient.newBuilder()
                 .withClientId("partition-" + partition.getId())
-                .withMemberId(MemberId.from(localMemberId.id()))
+                .withMemberId(localMemberId)
                 .withProtocol(protocol)
                 .build();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
index 04c0bdf..1bed89d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
@@ -99,7 +99,7 @@
     public PartitionInfo toPartitionInfo() {
         Function<RaftMember, String> memberToString =
                 m -> m == null ? "none" : m.memberId().toString();
-        return new PartitionInfo(partitionId.toString(),
+        return new PartitionInfo(partitionId,
                 leaderTerm,
                 activeMembers.stream().map(memberToString).collect(Collectors.toList()),
                 memberToString.apply(leader));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index bb071a2..123c3b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -16,16 +16,19 @@
 package org.onosproject.store.primitives.impl;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
 
 import io.atomix.protocols.raft.RaftServer;
 import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.cluster.RaftMember;
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.storage.StorageLevel;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
 import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
 import org.onosproject.store.service.PartitionInfo;
 import org.onosproject.store.service.Serializer;
@@ -46,23 +49,21 @@
 
     private final MemberId localMemberId;
     private final StoragePartition partition;
-    private final Supplier<RaftServerProtocol> protocol;
-    private final File dataFolder;
+    private final UnifiedClusterCommunicationService clusterCommunicator;
     private RaftServer server;
 
     public StoragePartitionServer(
             StoragePartition partition,
             MemberId localMemberId,
-            Supplier<RaftServerProtocol> protocol,
-            File dataFolder) {
+            UnifiedClusterCommunicationService clusterCommunicator) {
         this.partition = partition;
         this.localMemberId = localMemberId;
-        this.protocol = protocol;
-        this.dataFolder = dataFolder;
+        this.clusterCommunicator = clusterCommunicator;
     }
 
     @Override
     public CompletableFuture<Void> open() {
+        log.info("Starting server for partition {} ({})", partition.getId(), partition.getVersion());
         CompletableFuture<RaftServer> serverOpenFuture;
         if (partition.getMemberIds().contains(localMemberId)) {
             if (server != null && server.isRunning()) {
@@ -77,9 +78,11 @@
         }
         return serverOpenFuture.whenComplete((r, e) -> {
             if (e == null) {
-                log.info("Successfully started server for partition {}", partition.getId());
+                log.info("Successfully started server for partition {} ({})",
+                        partition.getId(), partition.getVersion());
             } else {
-                log.info("Failed to start server for partition {}", partition.getId(), e);
+                log.info("Failed to start server for partition {} ({})",
+                        partition.getId(), partition.getVersion(), e);
             }
         }).thenApply(v -> null);
     }
@@ -97,16 +100,68 @@
         return server.leave();
     }
 
-    private RaftServer buildServer() {
+    /**
+     * Forks the existing partition into a new partition.
+     *
+     * @param version the version from which to fork the server
+     * @return future to be completed once the fork operation is complete
+     */
+    public CompletableFuture<Void> fork(Version version) {
+        log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
         RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
-                .withName("partition-" + partition.getId())
-                .withProtocol(protocol.get())
+                .withName(partition.getName(version))
+                .withType(RaftMember.Type.PASSIVE)
+                .withProtocol(new RaftServerCommunicator(
+                        partition.getName(version),
+                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+                        clusterCommunicator))
                 .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
                 .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
                 .withStorage(RaftStorage.newBuilder()
                         .withStorageLevel(StorageLevel.MAPPED)
                         .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
-                        .withDirectory(dataFolder)
+                        .withDirectory(partition.getDataFolder())
+                        .withMaxSegmentSize(MAX_SEGMENT_SIZE)
+                        .build());
+        StoragePartition.RAFT_SERVICES.forEach(builder::addService);
+        RaftServer server = builder.build();
+        return server.join(partition.getMemberIds(version))
+                .thenCompose(v -> server.shutdown())
+                .thenCompose(v -> {
+                    // Delete the cluster configuration file from the forked partition.
+                    try {
+                        Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+                    } catch (IOException e) {
+                        log.error("Failed to delete partition configuration: {}", e);
+                    }
+
+                    // Build and bootstrap a new server.
+                    this.server = buildServer();
+                    return this.server.bootstrap();
+                }).whenComplete((r, e) -> {
+                    if (e == null) {
+                        log.info("Successfully forked server for partition {} ({}->{})",
+                                partition.getId(), version, partition.getVersion());
+                    } else {
+                        log.info("Failed to fork server for partition {} ({}->{})",
+                                partition.getId(), version, partition.getVersion(), e);
+                    }
+                }).thenApply(v -> null);
+    }
+
+    private RaftServer buildServer() {
+        RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
+                .withName(partition.getName())
+                .withProtocol(new RaftServerCommunicator(
+                        partition.getName(),
+                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+                        clusterCommunicator))
+                .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
+                .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
+                .withStorage(RaftStorage.newBuilder()
+                        .withStorageLevel(StorageLevel.MAPPED)
+                        .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
+                        .withDirectory(partition.getDataFolder())
                         .withMaxSegmentSize(MAX_SEGMENT_SIZE)
                         .build());
         StoragePartition.RAFT_SERVICES.forEach(builder::addService);
@@ -114,12 +169,13 @@
     }
 
     public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
+        log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
         server = buildServer();
         return server.join(otherMembers).whenComplete((r, e) -> {
             if (e == null) {
-                log.info("Successfully joined partition {}", partition.getId());
+                log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
             } else {
-                log.info("Failed to join partition {}", partition.getId(), e);
+                log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
             }
         }).thenApply(v -> null);
     }