Upgrade to Atomix 3.0-rc5
* Upgrade Raft primitives to Atomix 3.0
* Replace cluster store and messaging implementations with Atomix cluster management/messaging
* Add test scripts for installing/starting Atomix cluster
* Replace core primitives with Atomix primitives.

Change-Id: I7623653c81292a34f21b01f5f38ca11b5ef15cad
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 719a96e..dd71e38 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -18,11 +18,17 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
+import io.atomix.core.Atomix;
+import io.atomix.core.counter.AtomicCounter;
+import io.atomix.core.counter.AtomicCounterType;
+import io.atomix.core.map.AtomicMapType;
+import io.atomix.core.workqueue.WorkQueueType;
+import io.atomix.primitive.partition.PartitionGroup;
+import io.atomix.protocols.raft.MultiRaftProtocol;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -34,15 +40,12 @@
 import org.onosproject.cluster.Member;
 import org.onosproject.cluster.MembershipService;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
 import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.impl.AtomixManager;
 import org.onosproject.store.primitives.PartitionAdminService;
-import org.onosproject.store.primitives.PartitionService;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.AsyncDocumentTree;
@@ -50,7 +53,6 @@
 import org.onosproject.store.service.AtomicCounterMapBuilder;
 import org.onosproject.store.service.AtomicIdGeneratorBuilder;
 import org.onosproject.store.service.AtomicValueBuilder;
-import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.ConsistentMultimapBuilder;
 import org.onosproject.store.service.ConsistentTreeMapBuilder;
@@ -68,6 +70,7 @@
 import org.onosproject.store.service.TopicBuilder;
 import org.onosproject.store.service.TransactionContextBuilder;
 import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.WorkQueueBuilder;
 import org.onosproject.store.service.WorkQueueStats;
 import org.slf4j.Logger;
 
@@ -82,8 +85,6 @@
 @Component(immediate = true)
 public class StorageManager implements StorageService, StorageAdminService {
 
-    private static final int BUCKETS = 128;
-
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -96,26 +97,21 @@
     protected PersistenceService persistenceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected PartitionService partitionService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PartitionAdminService partitionAdminService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MembershipService membershipService;
 
-    private final Supplier<TransactionId> transactionIdGenerator =
-            () -> TransactionId.from(UUID.randomUUID().toString());
-    private DistributedPrimitiveCreator federatedPrimitiveCreator;
-    private TransactionManager transactionManager;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected AtomixManager atomixManager;
+
+    private Atomix atomix;
+    private PartitionGroup group;
 
     @Activate
     public void activate() {
-        Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
-        partitionService.getAllPartitionIds().stream()
-            .forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
-        federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap, BUCKETS);
-        transactionManager = new TransactionManager(this, partitionService, BUCKETS);
+        atomix = atomixManager.getAtomix();
+        group = atomix.getPartitionService().getPartitionGroup(MultiRaftProtocol.TYPE);
         log.info("Started");
     }
 
@@ -137,175 +133,212 @@
 
         // Use the MembershipService to provide peers for the map that are isolated within the current version.
         Supplier<List<NodeId>> peersSupplier = () -> membershipService.getMembers().stream()
-                .map(Member::nodeId)
-                .filter(nodeId -> !nodeId.equals(localNodeId))
-                .filter(id -> clusterService.getState(id).isActive())
-                .collect(Collectors.toList());
+            .map(Member::nodeId)
+            .filter(nodeId -> !nodeId.equals(localNodeId))
+            .filter(id -> clusterService.getState(id).isActive())
+            .collect(Collectors.toList());
 
         // If this is the first node in its version, bootstrap from the previous version. Otherwise, bootstrap the
         // map from members isolated within the current version.
         Supplier<List<NodeId>> bootstrapPeersSupplier = () -> {
             if (membershipService.getMembers().size() == 1) {
                 return clusterService.getNodes()
-                        .stream()
-                        .map(ControllerNode::id)
-                        .filter(id -> !localNodeId.equals(id))
-                        .filter(id -> clusterService.getState(id).isActive())
-                        .collect(Collectors.toList());
+                    .stream()
+                    .map(ControllerNode::id)
+                    .filter(id -> !localNodeId.equals(id))
+                    .filter(id -> clusterService.getState(id).isActive())
+                    .collect(Collectors.toList());
             } else {
                 return membershipService.getMembers()
-                        .stream()
-                        .map(Member::nodeId)
-                        .filter(id -> !localNodeId.equals(id))
-                        .filter(id -> clusterService.getState(id).isActive())
-                        .collect(Collectors.toList());
+                    .stream()
+                    .map(Member::nodeId)
+                    .filter(id -> !localNodeId.equals(id))
+                    .filter(id -> clusterService.getState(id).isActive())
+                    .collect(Collectors.toList());
             }
         };
 
         return new EventuallyConsistentMapBuilderImpl<>(
-                localNodeId,
-                clusterCommunicator,
-                persistenceService,
-                peersSupplier,
-                bootstrapPeersSupplier
+            localNodeId,
+            clusterCommunicator,
+            persistenceService,
+            peersSupplier,
+            bootstrapPeersSupplier
         );
     }
 
     @Override
     public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
+        return new AtomixConsistentMapBuilder<>(atomix, group.name());
     }
 
     @Override
     public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultDocumentTreeBuilder<V>(federatedPrimitiveCreator);
+        return new AtomixDocumentTreeBuilder<V>(atomix, group.name());
     }
 
     @Override
     public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
-        return new DefaultConsistentTreeMapBuilder<V>(
-                federatedPrimitiveCreator);
+        return new AtomixConsistentTreeMapBuilder<>(atomix, group.name());
     }
 
     @Override
     public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultConsistentMultimapBuilder<K, V>(
-                federatedPrimitiveCreator);
+        return new AtomixConsistentMultimapBuilder<>(atomix, group.name());
     }
 
     @Override
     public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultAtomicCounterMapBuilder<>(federatedPrimitiveCreator);
+        return new AtomixAtomicCounterMapBuilder<>(atomix, group.name());
     }
 
     @Override
     public <E> DistributedSetBuilder<E> setBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
+        return new AtomixDistributedSetBuilder<>(atomix, group.name());
     }
 
     @Override
     public AtomicCounterBuilder atomicCounterBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultAtomicCounterBuilder(federatedPrimitiveCreator);
+        return new AtomixAtomicCounterBuilder(atomix, group.name());
     }
 
     @Override
     public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultAtomicIdGeneratorBuilder(federatedPrimitiveCreator);
+        return new AtomixAtomicIdGeneratorBuilder(atomix, group.name());
     }
 
     @Override
     public <V> AtomicValueBuilder<V> atomicValueBuilder() {
         checkPermission(STORAGE_WRITE);
-        Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
-                () -> this.<String, byte[]>consistentMapBuilder()
-                          .withName("onos-atomic-values")
-                          .withSerializer(Serializer.using(KryoNamespaces.BASIC));
-        return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
+        return new AtomixAtomicValueBuilder<>(atomix, group.name());
     }
 
     @Override
     public TransactionContextBuilder transactionContextBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultTransactionContextBuilder(transactionIdGenerator.get(), transactionManager);
+        return new AtomixTransactionContextBuilder(atomix, group.name());
     }
 
     @Override
     public DistributedLockBuilder lockBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultDistributedLockBuilder(federatedPrimitiveCreator);
+        return new AtomixDistributedLockBuilder(atomix, group.name());
     }
 
     @Override
     public LeaderElectorBuilder leaderElectorBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultLeaderElectorBuilder(federatedPrimitiveCreator);
+        return new AtomixLeaderElectorBuilder(atomix, group.name(), clusterService.getLocalNode().id());
     }
 
     @Override
     public <T> TopicBuilder<T> topicBuilder() {
         checkPermission(STORAGE_WRITE);
-        return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
+        return new AtomixDistributedTopicBuilder<>(atomix, group.name());
+    }
+
+    @Override
+    public <E> WorkQueueBuilder<E> workQueueBuilder() {
+        checkPermission(STORAGE_WRITE);
+        return new AtomixWorkQueueBuilder<>(atomix, group.name());
     }
 
     @Override
     public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
         checkPermission(STORAGE_WRITE);
-        return federatedPrimitiveCreator.newWorkQueue(name, serializer);
+        return this.<E>workQueueBuilder()
+            .withName(name)
+            .withSerializer(serializer)
+            .build();
     }
 
     @Override
     public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
         checkPermission(STORAGE_WRITE);
-        return federatedPrimitiveCreator.newAsyncDocumentTree(name, serializer);
+        return this.<V>documentTreeBuilder()
+            .withName(name)
+            .withSerializer(serializer)
+            .build();
     }
 
     @Override
-    public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
-            String name, Serializer serializer) {
+    public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name, Serializer serializer) {
         checkPermission(STORAGE_WRITE);
-        return federatedPrimitiveCreator.newAsyncConsistentSetMultimap(name,
-                                                                serializer);
+        return new AtomixConsistentMultimapBuilder<K, V>(atomix, group.name())
+            .withName(name)
+            .withSerializer(serializer)
+            .buildMultimap();
     }
 
     @Override
-    public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
-            String name, Serializer serializer) {
+    public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name, Serializer serializer) {
         checkPermission(STORAGE_WRITE);
-        return federatedPrimitiveCreator.newAsyncConsistentTreeMap(name,
-                                                                   serializer);
+        return this.<V>consistentTreeMapBuilder()
+            .withName(name)
+            .withSerializer(serializer)
+            .buildTreeMap();
+    }
+
+    @Override
+    public <T> Topic<T> getTopic(String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return this.<T>topicBuilder()
+            .withName(name)
+            .withSerializer(serializer)
+            .build();
     }
 
     @Override
     public List<MapInfo> getMapInfo() {
-        return listMapInfo(federatedPrimitiveCreator);
+        Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+        return atomix.getPrimitives(AtomicMapType.instance())
+            .stream()
+            .map(info -> {
+                io.atomix.core.map.AtomicMap<String, byte[]> map =
+                    atomix.<String, byte[]>atomicMapBuilder(info.name())
+                        .withSerializer(new AtomixSerializerAdapter(serializer))
+                        .build();
+                int size = map.size();
+                map.close();
+                return new MapInfo(info.name(), size);
+            }).collect(Collectors.toList());
     }
 
     @Override
     public Map<String, Long> getCounters() {
-        Map<String, Long> counters = Maps.newConcurrentMap();
-        federatedPrimitiveCreator.getAsyncAtomicCounterNames()
-               .forEach(name -> counters.put(name,
-                       federatedPrimitiveCreator.newAsyncCounter(name).asAtomicCounter().get()));
-        return counters;
+        return atomix.getPrimitives(AtomicCounterType.instance())
+            .stream()
+            .map(info -> {
+                AtomicCounter counter = atomix.atomicCounterBuilder(info.name()).build();
+                long value = counter.get();
+                counter.close();
+                return Maps.immutableEntry(info.name(), value);
+            }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
     }
 
     @Override
     public Map<String, WorkQueueStats> getQueueStats() {
-        Map<String, WorkQueueStats> workQueueStats = Maps.newConcurrentMap();
-        federatedPrimitiveCreator.getWorkQueueNames()
-               .forEach(name -> workQueueStats.put(name,
-                       federatedPrimitiveCreator.newWorkQueue(name,
-                                                              Serializer.using(KryoNamespaces.BASIC))
-                                                .stats()
-                                                .join()));
-        return workQueueStats;
+        Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+        return atomix.getPrimitives(WorkQueueType.instance())
+            .stream()
+            .map(info -> {
+                io.atomix.core.workqueue.WorkQueue queue = atomix.workQueueBuilder(info.name())
+                    .withSerializer(new AtomixSerializerAdapter(serializer))
+                    .build();
+                io.atomix.core.workqueue.WorkQueueStats stats = queue.stats();
+                return Maps.immutableEntry(info.name(), WorkQueueStats.builder()
+                    .withTotalCompleted(stats.totalCompleted())
+                    .withTotalInProgress(stats.totalInProgress())
+                    .withTotalPending(stats.totalPending())
+                    .build());
+            }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
     }
 
     @Override
@@ -315,27 +348,9 @@
 
     @Override
     public Collection<TransactionId> getPendingTransactions() {
-        return transactionManager.getPendingTransactions();
-    }
-
-    private List<MapInfo> listMapInfo(DistributedPrimitiveCreator creator) {
-        Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
-        return creator.getAsyncConsistentMapNames()
-        .stream()
-        .map(name -> {
-            ConsistentMap<String, byte[]> map =
-                    creator.<String, byte[]>newAsyncConsistentMap(name, serializer)
-                                             .asConsistentMap();
-                    return new MapInfo(name, map.size());
-        }).collect(Collectors.toList());
-    }
-
-    @Override
-    public <T> Topic<T> getTopic(String name, Serializer serializer) {
-        AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
-                                              .withName("topic-" + name)
-                                              .withSerializer(serializer)
-                                              .build();
-        return new DefaultDistributedTopic<>(atomicValue);
+        return atomix.getTransactionService().getActiveTransactions()
+            .stream()
+            .map(transactionId -> TransactionId.from(transactionId.id()))
+            .collect(Collectors.toList());
     }
 }