PartitionManager support for reacting to cluster metadata changes
Change-Id: I65e358f5cb47e9420fae9589661ba0ce45f58df6
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/PartitionAdminService.java b/core/api/src/main/java/org/onosproject/store/primitives/PartitionAdminService.java
index 04d8e99..241c384 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/PartitionAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/PartitionAdminService.java
@@ -16,9 +16,7 @@
package org.onosproject.store.primitives;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.onosproject.cluster.PartitionId;
import org.onosproject.store.service.PartitionInfo;
/**
@@ -31,20 +29,4 @@
* @return list of {@code PartitionInfo}
*/
List<PartitionInfo> partitionInfo();
-
- /**
- * Leaves a partition.
- *
- * @param partitionId partition identifier
- * @return future that is completed when the operation completes.
- */
- CompletableFuture<Void> leave(PartitionId partitionId);
-
- /**
- * Joins a partition.
- *
- * @param partitionId partition identifier
- * @return future that is completed when the operation completes.
- */
- CompletableFuture<Void> join(PartitionId partitionId);
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 45bd171..a083a8b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
@@ -32,9 +33,14 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.PartitionDiff;
import org.onosproject.cluster.PartitionId;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.store.cluster.messaging.MessagingService;
@@ -68,15 +74,19 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
- Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+ private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+ private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
+ private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
@Activate
public void activate() {
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
-
- metadataService.getClusterMetadata()
+ currentClusterMetadata.set(metadataService.getClusterMetadata());
+ metadataService.addListener(metadataListener);
+ currentClusterMetadata.get()
.getPartitions()
.stream()
+ .filter(partition -> !partition.getId().equals(PartitionId.from(0))) // exclude p0
.forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
messagingService,
clusterService,
@@ -93,6 +103,7 @@
@Deactivate
public void deactivate() {
+ metadataService.removeListener(metadataListener);
eventDispatcher.removeSink(PartitionEvent.class);
CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
@@ -104,20 +115,6 @@
}
@Override
- public CompletableFuture<Void> leave(PartitionId partitionId) {
- return partitions.get(partitionId)
- .server()
- .map(server -> server.close())
- .orElse(CompletableFuture.completedFuture(null));
- }
-
- @Override
- public CompletableFuture<Void> join(PartitionId partitionId) {
- return partitions.get(partitionId)
- .open();
- }
-
- @Override
public int getNumberOfPartitions() {
return partitions.size();
}
@@ -152,4 +149,23 @@
.flatMap(x -> Tools.stream(x.info()))
.collect(Collectors.toList());
}
+
+ private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
+ ClusterMetadataDiff diffExaminer =
+ new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
+ diffExaminer.partitionDiffs()
+ .values()
+ .stream()
+ // TODO: Remove after partition 0 is removed from cluster metadata.
+ .filter(diff -> !diff.partitionId().equals(PartitionId.from(0)))
+ .filter(PartitionDiff::hasChanged)
+ .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
+ }
+
+ private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+ @Override
+ public void event(ClusterMetadataEvent event) {
+ processMetadataUpdate(event.subject());
+ }
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 66013c0..5cdc5a9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -28,9 +28,9 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
+import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
@@ -42,7 +42,7 @@
/**
* Storage partition.
*/
-public class StoragePartition extends DefaultPartition implements Managed<StoragePartition> {
+public class StoragePartition implements Managed<StoragePartition> {
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -50,14 +50,14 @@
private final MessagingService messagingService;
private final ClusterService clusterService;
private final File logFolder;
- private CompletableFuture<StoragePartitionServer> serverOpenFuture;
+ private Partition partition;
private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
new ResourceType(DistributedLong.class),
new ResourceType(AtomixLeaderElector.class),
new ResourceType(AtomixConsistentMap.class));
private NodeId localNodeId;
- private Optional<StoragePartitionServer> server = Optional.empty();
+ private StoragePartitionServer server;
private StoragePartitionClient client;
public StoragePartition(Partition partition,
@@ -65,7 +65,7 @@
ClusterService clusterService,
Serializer serializer,
File logFolder) {
- super(partition);
+ this.partition = partition;
this.messagingService = messagingService;
this.clusterService = clusterService;
this.localNodeId = clusterService.getLocalNode().id();
@@ -81,61 +81,86 @@
return client;
}
- /**
- * Returns the optional server instance.
- * @return server
- */
- public Optional<StoragePartitionServer> server() {
- return server;
- }
-
@Override
public CompletableFuture<Void> open() {
- serverOpenFuture = openServer();
- serverOpenFuture.thenAccept(s -> server = Optional.ofNullable(s));
+ openServer();
return openClient().thenAccept(v -> isOpened.set(true))
.thenApply(v -> null);
}
@Override
public CompletableFuture<Void> close() {
- return closeClient().thenCompose(v -> closeServer())
- .thenAccept(v -> isClosed.set(true))
+ // We do not explicitly close the server and instead let the cluster
+ // deal with this as an unclean exit.
+ return closeClient().thenAccept(v -> isClosed.set(true))
.thenApply(v -> null);
}
- public Collection<Address> getMemberAddresses() {
- return Collections2.transform(getMembers(), this::toAddress);
+ /**
+ * Returns the identifier of the {@link Partition partition} associated with this instance.
+ * @return partition identifier
+ */
+ public PartitionId getId() {
+ return partition.getId();
}
- private CompletableFuture<StoragePartitionServer> openServer() {
- if (!getMembers().contains(localNodeId)) {
+ /**
+ * Returns the identifiers of partition members.
+ * @return partition member instance ids
+ */
+ public Collection<NodeId> getMembers() {
+ return partition.getMembers();
+ }
+
+ /**
+ * Returns the {@link Address addresses} of partition members.
+ * @return partition member addresses
+ */
+ public Collection<Address> getMemberAddresses() {
+ return Collections2.transform(partition.getMembers(), this::toAddress);
+ }
+
+ private CompletableFuture<Void> openServer() {
+ if (!partition.getMembers().contains(localNodeId) || server != null) {
return CompletableFuture.completedFuture(null);
}
StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
this,
serializer,
() -> new CopycatTransport(CopycatTransport.Mode.SERVER,
- getId(),
+ partition.getId(),
messagingService),
RESOURCE_TYPES,
logFolder);
- return server.open().thenApply(v -> server);
+ return server.open().thenRun(() -> this.server = server);
}
private CompletableFuture<StoragePartitionClient> openClient() {
client = new StoragePartitionClient(this,
serializer,
new CopycatTransport(CopycatTransport.Mode.CLIENT,
- getId(),
+ partition.getId(),
messagingService),
RESOURCE_TYPES);
return client.open().thenApply(v -> client);
}
- private CompletableFuture<Void> closeServer() {
- return server.map(StoragePartitionServer::close)
- .orElse(CompletableFuture.completedFuture(null));
+ /**
+ * Closes the partition server if it was previously opened.
+ * @return future that is completed when the operation completes
+ */
+ public CompletableFuture<Void> closeServer() {
+ return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return isOpened.get() && !isClosed.get();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return isClosed.get();
}
private CompletableFuture<Void> closeClient() {
@@ -150,22 +175,21 @@
return new Address(node.ip().toString(), node.tcpPort());
}
- @Override
- public boolean isOpen() {
- return !isClosed.get() && isOpened.get();
- }
-
- @Override
- public boolean isClosed() {
- return isOpened.get() && isClosed.get();
- }
-
/**
* Returns the partition information if this partition is locally managed i.e.
* this node is a active member of the partition.
* @return partition info
*/
public Optional<PartitionInfo> info() {
- return server.map(StoragePartitionServer::info);
+ return server != null ? Optional.of(server.info()) : Optional.empty();
+ }
+
+ public void onUpdate(Partition partition) {
+ this.partition = partition;
+ if (partition.getMembers().contains(localNodeId)) {
+ openServer();
+ } else if (!partition.getMembers().contains(localNodeId)) {
+ closeServer();
+ }
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index e6669dc..6d613c3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -31,16 +31,13 @@
import java.io.File;
import java.util.Collection;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
-import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.PartitionInfo;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
/**
* {@link StoragePartition} server.
@@ -80,7 +77,7 @@
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
- server = server();
+ server = buildServer();
}
serverOpenFuture = server.open();
} else {
@@ -97,13 +94,22 @@
@Override
public CompletableFuture<Void> close() {
- // We do not close the server because doing so is equivalent to this node
- // leaving the cluster and we don't want that here.
- // The Raft protocol should take care of servers leaving unannounced.
- return CompletableFuture.completedFuture(null);
+ /**
+ * CopycatServer#kill just shuts down the server and does not result
+ * in any cluster membership changes.
+ */
+ return server.kill();
}
- private CopycatServer server() {
+ /**
+ * Closes the server and exits the partition.
+ * @return future that is completed when the operation is complete
+ */
+ public CompletableFuture<Void> closeAndExit() {
+ return server.close();
+ }
+
+ private CopycatServer buildServer() {
ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
@@ -124,10 +130,6 @@
return server;
}
- public Set<NodeId> configuredMembers() {
- return Sets.newHashSet(partition.getMembers());
- }
-
@Override
public boolean isOpen() {
return server.isOpen();