Upgrade to Atomix 3.0-rc5
* Upgrade Raft primitives to Atomix 3.0
* Replace cluster store and messaging implementations with Atomix cluster management/messaging
* Add test scripts for installing/starting Atomix cluster
* Replace core primitives with Atomix primitives.
Change-Id: I7623653c81292a34f21b01f5f38ca11b5ef15cad
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index 56ea048..edbd1f7 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -16,15 +16,9 @@
package org.onosproject.cluster.impl;
import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -33,26 +27,20 @@
import org.apache.felix.scr.annotations.Service;
import org.apache.karaf.system.SystemService;
import org.onlab.packet.IpAddress;
-import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterMetadata;
-import org.onosproject.cluster.ClusterMetadataAdminService;
import org.onosproject.cluster.ClusterMetadataDiff;
import org.onosproject.cluster.ClusterMetadataEvent;
import org.onosproject.cluster.ClusterMetadataEventListener;
-import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.Node;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.Partition;
-import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
-import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
import org.slf4j.Logger;
@@ -78,20 +66,11 @@
private ClusterStoreDelegate delegate = new InternalStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataService clusterMetadataService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterMetadataAdminService clusterMetadataAdminService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterStore store;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SystemService systemService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected VersionService versionService;
-
private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
@@ -99,14 +78,11 @@
public void activate() {
store.setDelegate(delegate);
eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
- clusterMetadataService.addListener(metadataListener);
- processMetadata(clusterMetadataService.getClusterMetadata());
log.info("Started");
}
@Deactivate
public void deactivate() {
- clusterMetadataService.removeListener(metadataListener);
store.unsetDelegate(delegate);
eventDispatcher.removeSink(ClusterEvent.class);
log.info("Stopped");
@@ -119,6 +95,12 @@
}
@Override
+ public Set<Node> getConsensusNodes() {
+ checkPermission(CLUSTER_READ);
+ return store.getStorageNodes();
+ }
+
+ @Override
public Set<ControllerNode> getNodes() {
checkPermission(CLUSTER_READ);
return store.getNodes();
@@ -163,25 +145,7 @@
@Override
public void formCluster(Set<ControllerNode> nodes, int partitionSize) {
- checkNotNull(nodes, "Nodes cannot be null");
- checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
-
- // Validate that the given nodes intersect with the currently configured nodes.
- Set<ControllerNode> existingNodes = Sets.newHashSet(clusterMetadataService.getClusterMetadata().getNodes());
- checkArgument(
- !Sets.intersection(nodes, existingNodes).isEmpty(),
- "Nodes must intersect with current cluster configuration");
-
- ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes, partitionSize));
- clusterMetadataAdminService.setClusterMetadata(metadata);
- try {
- log.warn("Shutting down container for cluster reconfiguration!");
- // Clean up persistent state associated with previous cluster configuration.
- Tools.removeDirectory(System.getProperty("karaf.data") + "/db/partitions/");
- systemService.reboot("now", SystemService.Swipe.NONE);
- } catch (Exception e) {
- log.error("Unable to reboot container", e);
- }
+ log.warn("formCluster is deprecated");
}
@Override
@@ -206,24 +170,6 @@
}
}
- private Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
- List<ControllerNode> sorted = new ArrayList<>(nodes);
- Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
- Set<Partition> partitions = Sets.newHashSet();
- // add partitions
- int length = nodes.size();
- int count = Math.min(partitionSize, length);
- for (int i = 0; i < length; i++) {
- int index = i;
- Set<NodeId> set = new HashSet<>(count);
- for (int j = 0; j < count; j++) {
- set.add(sorted.get((i + j) % length).id());
- }
- partitions.add(new DefaultPartition(PartitionId.from((index + 1)), versionService.version(), set));
- }
- return partitions;
- }
-
/**
* Processes metadata by adding and removing nodes from the cluster.
*/