ClusterManager support for reacting to cluster metadata changes
Change-Id: I7befaf4f955bda093d89c3c431eae6814409ae03
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterMetadataDiff.java b/core/api/src/main/java/org/onosproject/cluster/ClusterMetadataDiff.java
new file mode 100644
index 0000000..1b42fc5
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterMetadataDiff.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Utility for examining differences between two {@link ClusterMetadata metadata} values.
+ */
+public class ClusterMetadataDiff {
+
+ private final ClusterMetadata oldValue;
+ private final ClusterMetadata newValue;
+ private final Set<ControllerNode> nodesAdded;
+ private final Set<NodeId> nodesRemoved;
+
+ public ClusterMetadataDiff(ClusterMetadata oldValue, ClusterMetadata newValue) {
+ this.oldValue = oldValue;
+ this.newValue = newValue;
+
+ Set<ControllerNode> currentNodeSet = oldValue == null
+ ? ImmutableSet.of() : ImmutableSet.copyOf(oldValue.getNodes());
+ Set<ControllerNode> newNodeSet = newValue == null
+ ? ImmutableSet.of() : ImmutableSet.copyOf(newValue.getNodes());
+ nodesAdded = Sets.difference(newNodeSet, currentNodeSet);
+ nodesRemoved = Sets.difference(currentNodeSet, newNodeSet)
+ .stream()
+ .map(ControllerNode::id)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Returns the set of {@link ControllerNode nodes} added with this metadata change.
+ * @return set of controller nodes
+ */
+ public Set<ControllerNode> nodesAdded() {
+ return nodesAdded;
+ }
+
+ /**
+ * Returns the set of {@link ControllerNode nodes} removed with this metadata change.
+ * @return set of controller node identifiers
+ */
+ public Set<NodeId> nodesRemoved() {
+ return nodesRemoved;
+ }
+
+ /**
+ * Returns a mapping of all partition diffs.
+ * @return partition diffs.
+ */
+ public Map<PartitionId, PartitionDiff> partitionDiffs() {
+ Map<PartitionId, Partition> oldPartitions = Maps.newHashMap();
+ oldValue.getPartitions()
+ .forEach(p -> oldPartitions.put(p.getId(), p));
+ Map<PartitionId, Partition> newPartitions = Maps.newHashMap();
+ newValue.getPartitions()
+ .forEach(p -> newPartitions.put(p.getId(), p));
+ checkState(Sets.symmetricDifference(oldPartitions.keySet(), newPartitions.keySet()).isEmpty(),
+ "Number of partitions cannot change");
+ Map<PartitionId, PartitionDiff> partitionDiffs = Maps.newHashMap();
+ oldPartitions.forEach((k, v) -> {
+ partitionDiffs.put(k, new PartitionDiff(v, newPartitions.get(k)));
+ });
+ return partitionDiffs;
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/PartitionDiff.java b/core/api/src/main/java/org/onosproject/cluster/PartitionDiff.java
new file mode 100644
index 0000000..aaa67de
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/PartitionDiff.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * Utility for examining differences between two {@link Partition partition} values.
+ */
+public class PartitionDiff {
+
+ private final Partition oldValue;
+ private final Partition newValue;
+ private final PartitionId partitionId;
+ private final Set<NodeId> currentMembers;
+ private final Set<NodeId> newMembers;
+
+ public PartitionDiff(Partition oldValue, Partition newValue) {
+ this.oldValue = oldValue;
+ this.newValue = newValue;
+ this.partitionId = oldValue.getId();
+ this.currentMembers = oldValue == null ? ImmutableSet.of() : ImmutableSet.copyOf(oldValue.getMembers());
+ this.newMembers = newValue == null ? ImmutableSet.of() : ImmutableSet.copyOf(newValue.getMembers());
+ }
+
+ /**
+ * Returns the new partition identifier.
+ * @return partition id
+ */
+ public PartitionId partitionId() {
+ return partitionId;
+ }
+
+ /**
+ * Returns the old partition value.
+ * @return partition
+ */
+ public Partition oldValue() {
+ return oldValue;
+ }
+
+ /**
+ * Returns the new partition value.
+ * @return partition
+ */
+ public Partition newValue() {
+ return newValue;
+ }
+
+ /**
+ * Returns if there are differences between the two values.
+ * @return {@code true} if yes; {@code false} otherwise
+ */
+ public boolean hasChanged() {
+ return !Sets.symmetricDifference(currentMembers, newMembers).isEmpty();
+ }
+
+ /**
+ * Returns if the specified node is introduced in the new value.
+ * @param nodeId node identifier
+ * @return {@code true} if yes; {@code false} otherwise
+ */
+ public boolean isAdded(NodeId nodeId) {
+ return !currentMembers.contains(nodeId) && newMembers.contains(nodeId);
+ }
+
+ /**
+ * Returns if the specified node is removed in the new value.
+ * @param nodeId node identifier
+ * @return {@code true} if yes; {@code false} otherwise
+ */
+ public boolean isRemoved(NodeId nodeId) {
+ return currentMembers.contains(nodeId) && !newMembers.contains(nodeId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(oldValue, newValue);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof PartitionDiff)) {
+ return false;
+ }
+ PartitionDiff that = (PartitionDiff) other;
+ return Objects.equals(this.oldValue, that.oldValue) &&
+ Objects.equals(this.newValue, that.newValue);
+
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("oldValue", oldValue)
+ .add("newValue", newValue)
+ .toString();
+ }
+}
diff --git a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java
new file mode 100644
index 0000000..32739c4
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.onlab.packet.IpAddress;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests for ClusterMetadataDiff.
+ */
+public class ClusterMetadataDiffTest {
+
+ @Test
+ public void testDiffNoChange() {
+ PartitionId pid1 = PartitionId.from(1);
+ NodeId nid1 = NodeId.nodeId("10.0.0.1");
+ ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
+ Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
+ ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
+ ClusterMetadataDiff diff = new ClusterMetadataDiff(md1, md1);
+ assertTrue(diff.nodesAdded().isEmpty());
+ assertTrue(diff.nodesRemoved().isEmpty());
+ assertEquals(diff.partitionDiffs().size(), 1);
+ assertEquals(diff.partitionDiffs().keySet(), Sets.newHashSet(pid1));
+ PartitionDiff pdiff = diff.partitionDiffs().get(pid1);
+ assertFalse(pdiff.hasChanged());
+ }
+
+ @Test
+ public void testDiffForScaleUp() {
+ PartitionId pid1 = PartitionId.from(1);
+ NodeId nid1 = NodeId.nodeId("10.0.0.1");
+ NodeId nid2 = NodeId.nodeId("10.0.0.2");
+ ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
+ ControllerNode n2 = new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
+ Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
+ Partition p12 = new DefaultPartition(pid1, ImmutableSet.of(nid1, nid2));
+ ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
+ ClusterMetadata md12 = new ClusterMetadata("foo", ImmutableSet.of(n1, n2), ImmutableSet.of(p12));
+ ClusterMetadataDiff diff = new ClusterMetadataDiff(md1, md12);
+ assertEquals(diff.nodesAdded(), Sets.newHashSet(n2));
+ assertTrue(diff.nodesRemoved().isEmpty());
+ assertEquals(diff.partitionDiffs().size(), 1);
+ assertEquals(diff.partitionDiffs().keySet(), Sets.newHashSet(pid1));
+ PartitionDiff pdiff = diff.partitionDiffs().get(pid1);
+ assertTrue(pdiff.hasChanged());
+ assertFalse(pdiff.isAdded(nid1));
+ assertTrue(pdiff.isAdded(nid2));
+ assertFalse(pdiff.isRemoved(nid1));
+ assertFalse(pdiff.isAdded(nid1));
+ }
+
+ @Test
+ public void testDiffForScaleDown() {
+ PartitionId pid1 = PartitionId.from(1);
+ NodeId nid1 = NodeId.nodeId("10.0.0.1");
+ NodeId nid2 = NodeId.nodeId("10.0.0.2");
+ ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
+ ControllerNode n2 = new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
+ Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
+ Partition p12 = new DefaultPartition(pid1, ImmutableSet.of(nid1, nid2));
+ ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
+ ClusterMetadata md12 = new ClusterMetadata("foo", ImmutableSet.of(n1, n2), ImmutableSet.of(p12));
+ ClusterMetadataDiff diff = new ClusterMetadataDiff(md12, md1);
+ assertEquals(diff.nodesRemoved(), Sets.newHashSet(nid2));
+ assertTrue(diff.nodesAdded().isEmpty());
+ assertEquals(diff.partitionDiffs().size(), 1);
+ assertEquals(diff.partitionDiffs().keySet(), Sets.newHashSet(pid1));
+ PartitionDiff pdiff = diff.partitionDiffs().get(pid1);
+ assertTrue(pdiff.hasChanged());
+ assertTrue(pdiff.isRemoved(nid2));
+ assertFalse(pdiff.isAdded(nid2));
+ assertFalse(pdiff.isRemoved(nid1));
+ assertFalse(pdiff.isAdded(nid1));
+ }
+}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index 1dcff08..904fdff 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -29,6 +29,9 @@
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataAdminService;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ClusterStore;
@@ -50,6 +53,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -83,18 +87,21 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SystemService systemService;
+ private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
+ private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+
@Activate
public void activate() {
store.setDelegate(delegate);
eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
- clusterMetadataService.getClusterMetadata()
- .getNodes()
- .forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort()));
+ clusterMetadataService.addListener(metadataListener);
+ processMetadata(clusterMetadataService.getClusterMetadata());
log.info("Started");
}
@Deactivate
public void deactivate() {
+ clusterMetadataService.removeListener(metadataListener);
store.unsetDelegate(delegate);
eventDispatcher.removeSink(ClusterEvent.class);
log.info("Stopped");
@@ -190,4 +197,25 @@
}
return partitions;
}
+
+ /**
+ * Processes metadata by adding and removing nodes from the cluster.
+ */
+ private synchronized void processMetadata(ClusterMetadata metadata) {
+ try {
+ ClusterMetadataDiff examiner =
+ new ClusterMetadataDiff(currentMetadata.get(), metadata);
+ examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
+ examiner.nodesRemoved().forEach(this::removeNode);
+ } finally {
+ currentMetadata.set(metadata);
+ }
+ }
+
+ private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+ @Override
+ public void event(ClusterMetadataEvent event) {
+ processMetadata(event.subject());
+ }
+ }
}