Upgrade to Atomix 3.0-rc5
* Upgrade Raft primitives to Atomix 3.0
* Replace cluster store and messaging implementations with Atomix cluster management/messaging
* Add test scripts for installing/starting Atomix cluster
* Replace core primitives with Atomix primitives.
Change-Id: I7623653c81292a34f21b01f5f38ca11b5ef15cad
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 719a96e..dd71e38 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -18,11 +18,17 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.collect.Maps;
+import io.atomix.core.Atomix;
+import io.atomix.core.counter.AtomicCounter;
+import io.atomix.core.counter.AtomicCounterType;
+import io.atomix.core.map.AtomicMapType;
+import io.atomix.core.workqueue.WorkQueueType;
+import io.atomix.primitive.partition.PartitionGroup;
+import io.atomix.protocols.raft.MultiRaftProtocol;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -34,15 +40,12 @@
import org.onosproject.cluster.Member;
import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.PartitionId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
+import org.onosproject.store.impl.AtomixManager;
import org.onosproject.store.primitives.PartitionAdminService;
-import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDocumentTree;
@@ -50,7 +53,6 @@
import org.onosproject.store.service.AtomicCounterMapBuilder;
import org.onosproject.store.service.AtomicIdGeneratorBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
-import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMultimapBuilder;
import org.onosproject.store.service.ConsistentTreeMapBuilder;
@@ -68,6 +70,7 @@
import org.onosproject.store.service.TopicBuilder;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.WorkQueueBuilder;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
@@ -82,8 +85,6 @@
@Component(immediate = true)
public class StorageManager implements StorageService, StorageAdminService {
- private static final int BUCKETS = 128;
-
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -96,26 +97,21 @@
protected PersistenceService persistenceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected PartitionService partitionService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PartitionAdminService partitionAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MembershipService membershipService;
- private final Supplier<TransactionId> transactionIdGenerator =
- () -> TransactionId.from(UUID.randomUUID().toString());
- private DistributedPrimitiveCreator federatedPrimitiveCreator;
- private TransactionManager transactionManager;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected AtomixManager atomixManager;
+
+ private Atomix atomix;
+ private PartitionGroup group;
@Activate
public void activate() {
- Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
- partitionService.getAllPartitionIds().stream()
- .forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
- federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap, BUCKETS);
- transactionManager = new TransactionManager(this, partitionService, BUCKETS);
+ atomix = atomixManager.getAtomix();
+ group = atomix.getPartitionService().getPartitionGroup(MultiRaftProtocol.TYPE);
log.info("Started");
}
@@ -137,175 +133,212 @@
// Use the MembershipService to provide peers for the map that are isolated within the current version.
Supplier<List<NodeId>> peersSupplier = () -> membershipService.getMembers().stream()
- .map(Member::nodeId)
- .filter(nodeId -> !nodeId.equals(localNodeId))
- .filter(id -> clusterService.getState(id).isActive())
- .collect(Collectors.toList());
+ .map(Member::nodeId)
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
// If this is the first node in its version, bootstrap from the previous version. Otherwise, bootstrap the
// map from members isolated within the current version.
Supplier<List<NodeId>> bootstrapPeersSupplier = () -> {
if (membershipService.getMembers().size() == 1) {
return clusterService.getNodes()
- .stream()
- .map(ControllerNode::id)
- .filter(id -> !localNodeId.equals(id))
- .filter(id -> clusterService.getState(id).isActive())
- .collect(Collectors.toList());
+ .stream()
+ .map(ControllerNode::id)
+ .filter(id -> !localNodeId.equals(id))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
} else {
return membershipService.getMembers()
- .stream()
- .map(Member::nodeId)
- .filter(id -> !localNodeId.equals(id))
- .filter(id -> clusterService.getState(id).isActive())
- .collect(Collectors.toList());
+ .stream()
+ .map(Member::nodeId)
+ .filter(id -> !localNodeId.equals(id))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
}
};
return new EventuallyConsistentMapBuilderImpl<>(
- localNodeId,
- clusterCommunicator,
- persistenceService,
- peersSupplier,
- bootstrapPeersSupplier
+ localNodeId,
+ clusterCommunicator,
+ persistenceService,
+ peersSupplier,
+ bootstrapPeersSupplier
);
}
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultConsistentMapBuilder<>(federatedPrimitiveCreator);
+ return new AtomixConsistentMapBuilder<>(atomix, group.name());
}
@Override
public <V> DocumentTreeBuilder<V> documentTreeBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultDocumentTreeBuilder<V>(federatedPrimitiveCreator);
+ return new AtomixDocumentTreeBuilder<V>(atomix, group.name());
}
@Override
public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
- return new DefaultConsistentTreeMapBuilder<V>(
- federatedPrimitiveCreator);
+ return new AtomixConsistentTreeMapBuilder<>(atomix, group.name());
}
@Override
public <K, V> ConsistentMultimapBuilder<K, V> consistentMultimapBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultConsistentMultimapBuilder<K, V>(
- federatedPrimitiveCreator);
+ return new AtomixConsistentMultimapBuilder<>(atomix, group.name());
}
@Override
public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultAtomicCounterMapBuilder<>(federatedPrimitiveCreator);
+ return new AtomixAtomicCounterMapBuilder<>(atomix, group.name());
}
@Override
public <E> DistributedSetBuilder<E> setBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
+ return new AtomixDistributedSetBuilder<>(atomix, group.name());
}
@Override
public AtomicCounterBuilder atomicCounterBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultAtomicCounterBuilder(federatedPrimitiveCreator);
+ return new AtomixAtomicCounterBuilder(atomix, group.name());
}
@Override
public AtomicIdGeneratorBuilder atomicIdGeneratorBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultAtomicIdGeneratorBuilder(federatedPrimitiveCreator);
+ return new AtomixAtomicIdGeneratorBuilder(atomix, group.name());
}
@Override
public <V> AtomicValueBuilder<V> atomicValueBuilder() {
checkPermission(STORAGE_WRITE);
- Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier =
- () -> this.<String, byte[]>consistentMapBuilder()
- .withName("onos-atomic-values")
- .withSerializer(Serializer.using(KryoNamespaces.BASIC));
- return new DefaultAtomicValueBuilder<>(mapBuilderSupplier);
+ return new AtomixAtomicValueBuilder<>(atomix, group.name());
}
@Override
public TransactionContextBuilder transactionContextBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultTransactionContextBuilder(transactionIdGenerator.get(), transactionManager);
+ return new AtomixTransactionContextBuilder(atomix, group.name());
}
@Override
public DistributedLockBuilder lockBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultDistributedLockBuilder(federatedPrimitiveCreator);
+ return new AtomixDistributedLockBuilder(atomix, group.name());
}
@Override
public LeaderElectorBuilder leaderElectorBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultLeaderElectorBuilder(federatedPrimitiveCreator);
+ return new AtomixLeaderElectorBuilder(atomix, group.name(), clusterService.getLocalNode().id());
}
@Override
public <T> TopicBuilder<T> topicBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultDistributedTopicBuilder<>(atomicValueBuilder());
+ return new AtomixDistributedTopicBuilder<>(atomix, group.name());
+ }
+
+ @Override
+ public <E> WorkQueueBuilder<E> workQueueBuilder() {
+ checkPermission(STORAGE_WRITE);
+ return new AtomixWorkQueueBuilder<>(atomix, group.name());
}
@Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);
- return federatedPrimitiveCreator.newWorkQueue(name, serializer);
+ return this.<E>workQueueBuilder()
+ .withName(name)
+ .withSerializer(serializer)
+ .build();
}
@Override
public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);
- return federatedPrimitiveCreator.newAsyncDocumentTree(name, serializer);
+ return this.<V>documentTreeBuilder()
+ .withName(name)
+ .withSerializer(serializer)
+ .build();
}
@Override
- public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(
- String name, Serializer serializer) {
+ public <K, V> AsyncConsistentMultimap<K, V> getAsyncSetMultimap(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);
- return federatedPrimitiveCreator.newAsyncConsistentSetMultimap(name,
- serializer);
+ return new AtomixConsistentMultimapBuilder<K, V>(atomix, group.name())
+ .withName(name)
+ .withSerializer(serializer)
+ .buildMultimap();
}
@Override
- public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(
- String name, Serializer serializer) {
+ public <V> AsyncConsistentTreeMap<V> getAsyncTreeMap(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);
- return federatedPrimitiveCreator.newAsyncConsistentTreeMap(name,
- serializer);
+ return this.<V>consistentTreeMapBuilder()
+ .withName(name)
+ .withSerializer(serializer)
+ .buildTreeMap();
+ }
+
+ @Override
+ public <T> Topic<T> getTopic(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return this.<T>topicBuilder()
+ .withName(name)
+ .withSerializer(serializer)
+ .build();
}
@Override
public List<MapInfo> getMapInfo() {
- return listMapInfo(federatedPrimitiveCreator);
+ Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+ return atomix.getPrimitives(AtomicMapType.instance())
+ .stream()
+ .map(info -> {
+ io.atomix.core.map.AtomicMap<String, byte[]> map =
+ atomix.<String, byte[]>atomicMapBuilder(info.name())
+ .withSerializer(new AtomixSerializerAdapter(serializer))
+ .build();
+ int size = map.size();
+ map.close();
+ return new MapInfo(info.name(), size);
+ }).collect(Collectors.toList());
}
@Override
public Map<String, Long> getCounters() {
- Map<String, Long> counters = Maps.newConcurrentMap();
- federatedPrimitiveCreator.getAsyncAtomicCounterNames()
- .forEach(name -> counters.put(name,
- federatedPrimitiveCreator.newAsyncCounter(name).asAtomicCounter().get()));
- return counters;
+ return atomix.getPrimitives(AtomicCounterType.instance())
+ .stream()
+ .map(info -> {
+ AtomicCounter counter = atomix.atomicCounterBuilder(info.name()).build();
+ long value = counter.get();
+ counter.close();
+ return Maps.immutableEntry(info.name(), value);
+ }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
@Override
public Map<String, WorkQueueStats> getQueueStats() {
- Map<String, WorkQueueStats> workQueueStats = Maps.newConcurrentMap();
- federatedPrimitiveCreator.getWorkQueueNames()
- .forEach(name -> workQueueStats.put(name,
- federatedPrimitiveCreator.newWorkQueue(name,
- Serializer.using(KryoNamespaces.BASIC))
- .stats()
- .join()));
- return workQueueStats;
+ Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+ return atomix.getPrimitives(WorkQueueType.instance())
+ .stream()
+ .map(info -> {
+ io.atomix.core.workqueue.WorkQueue queue = atomix.workQueueBuilder(info.name())
+ .withSerializer(new AtomixSerializerAdapter(serializer))
+ .build();
+ io.atomix.core.workqueue.WorkQueueStats stats = queue.stats();
+ return Maps.immutableEntry(info.name(), WorkQueueStats.builder()
+ .withTotalCompleted(stats.totalCompleted())
+ .withTotalInProgress(stats.totalInProgress())
+ .withTotalPending(stats.totalPending())
+ .build());
+ }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}
@Override
@@ -315,27 +348,9 @@
@Override
public Collection<TransactionId> getPendingTransactions() {
- return transactionManager.getPendingTransactions();
- }
-
- private List<MapInfo> listMapInfo(DistributedPrimitiveCreator creator) {
- Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
- return creator.getAsyncConsistentMapNames()
- .stream()
- .map(name -> {
- ConsistentMap<String, byte[]> map =
- creator.<String, byte[]>newAsyncConsistentMap(name, serializer)
- .asConsistentMap();
- return new MapInfo(name, map.size());
- }).collect(Collectors.toList());
- }
-
- @Override
- public <T> Topic<T> getTopic(String name, Serializer serializer) {
- AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
- .withName("topic-" + name)
- .withSerializer(serializer)
- .build();
- return new DefaultDistributedTopic<>(atomicValue);
+ return atomix.getTransactionService().getActiveTransactions()
+ .stream()
+ .map(transactionId -> TransactionId.from(transactionId.id()))
+ .collect(Collectors.toList());
}
}