ClusterManager support for reacting to cluster metadata changes
Change-Id: I7befaf4f955bda093d89c3c431eae6814409ae03
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index 1dcff08..904fdff 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -29,6 +29,9 @@
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataAdminService;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ClusterStore;
@@ -50,6 +53,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -83,18 +87,21 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SystemService systemService;
+ private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
+ private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+
@Activate
public void activate() {
store.setDelegate(delegate);
eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
- clusterMetadataService.getClusterMetadata()
- .getNodes()
- .forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort()));
+ clusterMetadataService.addListener(metadataListener);
+ processMetadata(clusterMetadataService.getClusterMetadata());
log.info("Started");
}
@Deactivate
public void deactivate() {
+ clusterMetadataService.removeListener(metadataListener);
store.unsetDelegate(delegate);
eventDispatcher.removeSink(ClusterEvent.class);
log.info("Stopped");
@@ -190,4 +197,25 @@
}
return partitions;
}
+
+ /**
+ * Processes metadata by adding and removing nodes from the cluster.
+ */
+ private synchronized void processMetadata(ClusterMetadata metadata) {
+ try {
+ ClusterMetadataDiff examiner =
+ new ClusterMetadataDiff(currentMetadata.get(), metadata);
+ examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
+ examiner.nodesRemoved().forEach(this::removeNode);
+ } finally {
+ currentMetadata.set(metadata);
+ }
+ }
+
+ private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+ @Override
+ public void event(ClusterMetadataEvent event) {
+ processMetadata(event.subject());
+ }
+ }
}