Replace Unified* services with MembershipService for subgroup membership

Change-Id: Iabff173ce3501d1ed300513cac445bb712614bd9
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index 29045a2..80753f3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -25,12 +25,12 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.UnifiedClusterService;
+import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultPartition;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicValue;
@@ -70,10 +70,10 @@
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected UnifiedClusterService clusterService;
+    protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected UnifiedClusterCommunicationService clusterCommunicator;
+    protected ClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PersistenceService persistenceService;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
index 1bfaaed..9d6a016 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapBuilderImpl.java
@@ -15,21 +15,21 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.MembershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapBuilder;
-
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapBuilder;
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -38,8 +38,8 @@
  */
 public class EventuallyConsistentMapBuilderImpl<K, V>
         implements EventuallyConsistentMapBuilder<K, V> {
-    private final MembershipService clusterService;
-    private final ClusterCommunicator clusterCommunicator;
+    private final ClusterService clusterService;
+    private final ClusterCommunicationService clusterCommunicator;
 
     private String name;
     private KryoNamespace serializer;
@@ -64,9 +64,10 @@
      * @param clusterCommunicator cluster communication service
      * @param persistenceService persistence service
      */
-    public EventuallyConsistentMapBuilderImpl(MembershipService clusterService,
-                                              ClusterCommunicator clusterCommunicator,
-                                              PersistenceService persistenceService) {
+    public EventuallyConsistentMapBuilderImpl(
+            ClusterService clusterService,
+            ClusterCommunicationService clusterCommunicator,
+            PersistenceService persistenceService) {
         this.persistenceService = persistenceService;
         this.clusterService = checkNotNull(clusterService);
         this.clusterCommunicator = checkNotNull(clusterCommunicator);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
index 461245e..462d4ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/EventuallyConsistentMapImpl.java
@@ -15,34 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.util.AbstractAccumulator;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.SlidingWindowCounter;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.MembershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.LogicalTimestamp;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.DistributedPrimitive;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -67,6 +39,34 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SlidingWindowCounter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -86,8 +86,8 @@
 
     private final Map<K, MapValue<V>> items;
 
-    private final MembershipService clusterService;
-    private final ClusterCommunicator clusterCommunicator;
+    private final ClusterService clusterService;
+    private final ClusterCommunicationService clusterCommunicator;
     private final Serializer serializer;
     private final NodeId localNodeId;
     private final PersistenceService persistenceService;
@@ -162,8 +162,8 @@
      * @param persistenceService    persistence service
      */
     EventuallyConsistentMapImpl(String mapName,
-                                MembershipService clusterService,
-                                ClusterCommunicator clusterCommunicator,
+                                ClusterService clusterService,
+                                ClusterCommunicationService clusterCommunicator,
                                 KryoNamespace ns,
                                 BiFunction<K, V, Timestamp> timestampProvider,
                                 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 4a92682..5bbd681 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -45,7 +45,7 @@
 import org.onosproject.core.Version;
 import org.onosproject.core.VersionService;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.PartitionAdminService;
 import org.onosproject.store.primitives.PartitionEvent;
@@ -71,7 +71,7 @@
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected UnifiedClusterCommunicationService clusterCommunicator;
+    protected ClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterMetadataService metadataService;
@@ -103,24 +103,24 @@
             Version targetVersion = upgradeService.getState().target();
             currentClusterMetadata.get()
                     .getPartitions()
-                    .forEach(partition -> inactivePartitions.put(partition.getId(), new StoragePartition(
-                            partition,
-                            sourceVersion,
-                            null,
-                            clusterCommunicator,
-                            clusterService,
-                            new File(System.getProperty("karaf.data") +
-                                    "/partitions/" + sourceVersion + "/" + partition.getId()))));
-            currentClusterMetadata.get()
-                    .getPartitions()
-                    .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
-                            partition,
-                            targetVersion,
-                            sourceVersion,
-                            clusterCommunicator,
-                            clusterService,
-                            new File(System.getProperty("karaf.data") +
-                                    "/partitions/" + targetVersion + "/" + partition.getId()))));
+                    .forEach(partition -> {
+                        inactivePartitions.put(partition.getId(), new StoragePartition(
+                                partition,
+                                sourceVersion,
+                                null,
+                                clusterCommunicator,
+                                clusterService,
+                                new File(System.getProperty("karaf.data") +
+                                        "/partitions/" + sourceVersion + "/" + partition.getId())));
+                        activePartitions.put(partition.getId(), new StoragePartition(
+                                partition,
+                                targetVersion,
+                                sourceVersion,
+                                clusterCommunicator,
+                                clusterService,
+                                new File(System.getProperty("karaf.data") +
+                                        "/partitions/" + targetVersion + "/" + partition.getId())));
+                    });
 
             // We have to fork existing partitions before we can start inactive partition servers to
             // avoid duplicate message handlers when both servers are running.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
index 8bae2a3..ecba56d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftClientCommunicator.java
@@ -40,7 +40,7 @@
 import io.atomix.protocols.raft.protocol.ResetRequest;
 import io.atomix.protocols.raft.session.SessionId;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.service.Serializer;
 
 /**
@@ -51,7 +51,7 @@
     public RaftClientCommunicator(
             String prefix,
             Serializer serializer,
-            ClusterCommunicator clusterCommunicator) {
+            ClusterCommunicationService clusterCommunicator) {
         super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
index 765eb02..1117ab9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftCommunicator.java
@@ -22,7 +22,7 @@
 import io.atomix.protocols.raft.RaftException;
 import io.atomix.protocols.raft.cluster.MemberId;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.cluster.messaging.MessagingException;
 import org.onosproject.store.service.Serializer;
@@ -35,12 +35,12 @@
 public abstract class RaftCommunicator {
     protected final RaftMessageContext context;
     protected final Serializer serializer;
-    protected final ClusterCommunicator clusterCommunicator;
+    protected final ClusterCommunicationService clusterCommunicator;
 
     public RaftCommunicator(
             RaftMessageContext context,
             Serializer serializer,
-            ClusterCommunicator clusterCommunicator) {
+            ClusterCommunicationService clusterCommunicator) {
         this.context = checkNotNull(context, "context cannot be null");
         this.serializer = checkNotNull(serializer, "serializer cannot be null");
         this.clusterCommunicator = checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
index 2710a2c..097ee46 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/RaftServerCommunicator.java
@@ -57,7 +57,6 @@
 import io.atomix.protocols.raft.session.SessionId;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicator;
 import org.onosproject.store.service.Serializer;
 
 /**
@@ -68,7 +67,7 @@
     public RaftServerCommunicator(
             String prefix,
             Serializer serializer,
-            ClusterCommunicator clusterCommunicator) {
+            ClusterCommunicationService clusterCommunicator) {
         super(new RaftMessageContext(prefix), serializer, clusterCommunicator);
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 59c4b17..b73dd61 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -29,10 +29,10 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.PartitionId;
-import org.onosproject.cluster.UnifiedClusterService;
 import org.onosproject.persistence.PersistenceService;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.PartitionAdminService;
 import org.onosproject.store.primitives.PartitionService;
@@ -81,10 +81,10 @@
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected UnifiedClusterService clusterService;
+    protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected UnifiedClusterCommunicationService clusterCommunicator;
+    protected ClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PersistenceService persistenceService;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 414c49c..5458edd 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -29,12 +29,12 @@
 import com.google.common.collect.ImmutableMap;
 import io.atomix.protocols.raft.cluster.MemberId;
 import io.atomix.protocols.raft.service.RaftService;
-import org.onosproject.cluster.MembershipService;
+import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.Partition;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.core.Version;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMapService;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
@@ -54,8 +54,8 @@
 public class StoragePartition implements Managed<StoragePartition> {
 
     private final AtomicBoolean isOpened = new AtomicBoolean(false);
-    private final UnifiedClusterCommunicationService clusterCommunicator;
-    private final MembershipService clusterService;
+    private final ClusterCommunicationService clusterCommunicator;
+    private final ClusterService clusterService;
     private final Version version;
     private final Version source;
     private final File dataFolder;
@@ -85,8 +85,8 @@
             Partition partition,
             Version version,
             Version source,
-            UnifiedClusterCommunicationService clusterCommunicator,
-            MembershipService clusterService,
+            ClusterCommunicationService clusterCommunicator,
+            ClusterService clusterService,
             File dataFolder) {
         this.partition = partition;
         this.version = version;
@@ -191,6 +191,10 @@
         return source != null ?
                 clusterService.getNodes()
                         .stream()
+                        .filter(node -> {
+                            Version nodeVersion = clusterService.getVersion(node.id());
+                            return nodeVersion != null && nodeVersion.equals(version);
+                        })
                         .map(node -> MemberId.from(node.id().id()))
                         .collect(Collectors.toList()) :
                 Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
@@ -202,6 +206,10 @@
         } else {
             return clusterService.getNodes()
                     .stream()
+                    .filter(node -> {
+                        Version nodeVersion = clusterService.getVersion(node.id());
+                        return nodeVersion != null && nodeVersion.equals(version);
+                    })
                     .map(node -> MemberId.from(node.id().id()))
                     .collect(Collectors.toList());
         }
@@ -231,13 +239,10 @@
                 clusterCommunicator);
 
         CompletableFuture<Void> future;
-        if (clusterService.getNodes().size() == 1) {
+        if (getMemberIds().size() == 1) {
             future = server.fork(version);
         } else {
-            future = server.join(clusterService.getNodes().stream()
-                    .filter(node -> !node.id().equals(localNodeId))
-                    .map(node -> MemberId.from(node.id().id()))
-                    .collect(Collectors.toList()));
+            future = server.join(getMemberIds());
         }
         return future.thenRun(() -> this.server = server);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 123c3b6..3a15fce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -28,7 +28,7 @@
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.storage.StorageLevel;
 import org.onosproject.core.Version;
-import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
 import org.onosproject.store.service.PartitionInfo;
 import org.onosproject.store.service.Serializer;
@@ -49,13 +49,13 @@
 
     private final MemberId localMemberId;
     private final StoragePartition partition;
-    private final UnifiedClusterCommunicationService clusterCommunicator;
+    private final ClusterCommunicationService clusterCommunicator;
     private RaftServer server;
 
     public StoragePartitionServer(
             StoragePartition partition,
             MemberId localMemberId,
-            UnifiedClusterCommunicationService clusterCommunicator) {
+            ClusterCommunicationService clusterCommunicator) {
         this.partition = partition;
         this.localMemberId = localMemberId;
         this.clusterCommunicator = clusterCommunicator;