ClusterManager support for reacting to cluster metadata changes

Change-Id: I7befaf4f955bda093d89c3c431eae6814409ae03
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterMetadataDiff.java b/core/api/src/main/java/org/onosproject/cluster/ClusterMetadataDiff.java
new file mode 100644
index 0000000..1b42fc5
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterMetadataDiff.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Utility for examining differences between two {@link ClusterMetadata metadata} values.
+ */
+public class ClusterMetadataDiff {
+
+    private final ClusterMetadata oldValue;
+    private final ClusterMetadata newValue;
+    private final Set<ControllerNode> nodesAdded;
+    private final Set<NodeId> nodesRemoved;
+
+     public ClusterMetadataDiff(ClusterMetadata oldValue, ClusterMetadata newValue) {
+         this.oldValue = oldValue;
+         this.newValue = newValue;
+
+         Set<ControllerNode> currentNodeSet = oldValue == null
+                 ? ImmutableSet.of() : ImmutableSet.copyOf(oldValue.getNodes());
+         Set<ControllerNode> newNodeSet = newValue == null
+                 ? ImmutableSet.of() : ImmutableSet.copyOf(newValue.getNodes());
+         nodesAdded = Sets.difference(newNodeSet, currentNodeSet);
+         nodesRemoved = Sets.difference(currentNodeSet, newNodeSet)
+                            .stream()
+                            .map(ControllerNode::id)
+                            .collect(Collectors.toSet());
+     }
+
+    /**
+     * Returns the set of {@link ControllerNode nodes} added with this metadata change.
+     * @return set of controller nodes
+     */
+    public Set<ControllerNode> nodesAdded() {
+        return nodesAdded;
+    }
+
+    /**
+     * Returns the set of {@link ControllerNode nodes} removed with this metadata change.
+     * @return set of controller node identifiers
+     */
+    public Set<NodeId> nodesRemoved() {
+        return nodesRemoved;
+    }
+
+    /**
+     * Returns a mapping of all partition diffs.
+     * @return partition diffs.
+     */
+    public Map<PartitionId, PartitionDiff> partitionDiffs() {
+        Map<PartitionId, Partition> oldPartitions = Maps.newHashMap();
+        oldValue.getPartitions()
+                .forEach(p -> oldPartitions.put(p.getId(), p));
+        Map<PartitionId, Partition> newPartitions = Maps.newHashMap();
+        newValue.getPartitions()
+                .forEach(p -> newPartitions.put(p.getId(), p));
+        checkState(Sets.symmetricDifference(oldPartitions.keySet(), newPartitions.keySet()).isEmpty(),
+                   "Number of partitions cannot change");
+        Map<PartitionId, PartitionDiff> partitionDiffs = Maps.newHashMap();
+        oldPartitions.forEach((k, v) -> {
+            partitionDiffs.put(k, new PartitionDiff(v, newPartitions.get(k)));
+        });
+        return partitionDiffs;
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/PartitionDiff.java b/core/api/src/main/java/org/onosproject/cluster/PartitionDiff.java
new file mode 100644
index 0000000..aaa67de
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/PartitionDiff.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * Utility for examining differences between two {@link Partition partition} values.
+ */
+public class PartitionDiff {
+
+    private final Partition oldValue;
+    private final Partition newValue;
+    private final PartitionId partitionId;
+    private final Set<NodeId> currentMembers;
+    private final Set<NodeId> newMembers;
+
+    public PartitionDiff(Partition oldValue, Partition newValue) {
+        this.oldValue = oldValue;
+        this.newValue = newValue;
+        this.partitionId = oldValue.getId();
+        this.currentMembers = oldValue == null ? ImmutableSet.of() : ImmutableSet.copyOf(oldValue.getMembers());
+        this.newMembers = newValue == null ? ImmutableSet.of() : ImmutableSet.copyOf(newValue.getMembers());
+    }
+
+    /**
+     * Returns the new partition identifier.
+     * @return partition id
+     */
+    public PartitionId partitionId() {
+        return partitionId;
+    }
+
+    /**
+     * Returns the old partition value.
+     * @return partition
+     */
+    public Partition oldValue() {
+        return oldValue;
+    }
+
+    /**
+     * Returns the new partition value.
+     * @return partition
+     */
+    public Partition newValue() {
+        return newValue;
+    }
+
+    /**
+     * Returns if there are differences between the two values.
+     * @return {@code true} if yes; {@code false} otherwise
+     */
+    public boolean hasChanged() {
+        return !Sets.symmetricDifference(currentMembers, newMembers).isEmpty();
+    }
+
+    /**
+     * Returns if the specified node is introduced in the new value.
+     * @param nodeId node identifier
+     * @return {@code true} if yes; {@code false} otherwise
+     */
+    public boolean isAdded(NodeId nodeId) {
+        return !currentMembers.contains(nodeId) && newMembers.contains(nodeId);
+    }
+
+    /**
+     * Returns if the specified node is removed in the new value.
+     * @param nodeId node identifier
+     * @return {@code true} if yes; {@code false} otherwise
+     */
+    public boolean isRemoved(NodeId nodeId) {
+        return currentMembers.contains(nodeId) && !newMembers.contains(nodeId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(oldValue, newValue);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other == null || !(other instanceof PartitionDiff)) {
+            return false;
+        }
+        PartitionDiff that = (PartitionDiff) other;
+        return Objects.equals(this.oldValue, that.oldValue) &&
+                Objects.equals(this.newValue, that.newValue);
+
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("oldValue", oldValue)
+                .add("newValue", newValue)
+                .toString();
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java
new file mode 100644
index 0000000..32739c4
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.onlab.packet.IpAddress;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * Unit tests for ClusterMetadataDiff.
+ */
+public class ClusterMetadataDiffTest {
+
+    @Test
+    public void testDiffNoChange() {
+        PartitionId pid1 = PartitionId.from(1);
+        NodeId nid1 = NodeId.nodeId("10.0.0.1");
+        ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
+        Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
+        ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
+        ClusterMetadataDiff diff = new ClusterMetadataDiff(md1, md1);
+        assertTrue(diff.nodesAdded().isEmpty());
+        assertTrue(diff.nodesRemoved().isEmpty());
+        assertEquals(diff.partitionDiffs().size(), 1);
+        assertEquals(diff.partitionDiffs().keySet(), Sets.newHashSet(pid1));
+        PartitionDiff pdiff = diff.partitionDiffs().get(pid1);
+        assertFalse(pdiff.hasChanged());
+    }
+
+    @Test
+    public void testDiffForScaleUp() {
+        PartitionId pid1 = PartitionId.from(1);
+        NodeId nid1 = NodeId.nodeId("10.0.0.1");
+        NodeId nid2 = NodeId.nodeId("10.0.0.2");
+        ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
+        ControllerNode n2 = new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
+        Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
+        Partition p12 = new DefaultPartition(pid1, ImmutableSet.of(nid1, nid2));
+        ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
+        ClusterMetadata md12 = new ClusterMetadata("foo", ImmutableSet.of(n1, n2), ImmutableSet.of(p12));
+        ClusterMetadataDiff diff = new ClusterMetadataDiff(md1, md12);
+        assertEquals(diff.nodesAdded(), Sets.newHashSet(n2));
+        assertTrue(diff.nodesRemoved().isEmpty());
+        assertEquals(diff.partitionDiffs().size(), 1);
+        assertEquals(diff.partitionDiffs().keySet(), Sets.newHashSet(pid1));
+        PartitionDiff pdiff = diff.partitionDiffs().get(pid1);
+        assertTrue(pdiff.hasChanged());
+        assertFalse(pdiff.isAdded(nid1));
+        assertTrue(pdiff.isAdded(nid2));
+        assertFalse(pdiff.isRemoved(nid1));
+        assertFalse(pdiff.isAdded(nid1));
+    }
+
+    @Test
+    public void testDiffForScaleDown() {
+        PartitionId pid1 = PartitionId.from(1);
+        NodeId nid1 = NodeId.nodeId("10.0.0.1");
+        NodeId nid2 = NodeId.nodeId("10.0.0.2");
+        ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
+        ControllerNode n2 = new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
+        Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
+        Partition p12 = new DefaultPartition(pid1, ImmutableSet.of(nid1, nid2));
+        ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
+        ClusterMetadata md12 = new ClusterMetadata("foo", ImmutableSet.of(n1, n2), ImmutableSet.of(p12));
+        ClusterMetadataDiff diff = new ClusterMetadataDiff(md12, md1);
+        assertEquals(diff.nodesRemoved(), Sets.newHashSet(nid2));
+        assertTrue(diff.nodesAdded().isEmpty());
+        assertEquals(diff.partitionDiffs().size(), 1);
+        assertEquals(diff.partitionDiffs().keySet(), Sets.newHashSet(pid1));
+        PartitionDiff pdiff = diff.partitionDiffs().get(pid1);
+        assertTrue(pdiff.hasChanged());
+        assertTrue(pdiff.isRemoved(nid2));
+        assertFalse(pdiff.isAdded(nid2));
+        assertFalse(pdiff.isRemoved(nid1));
+        assertFalse(pdiff.isAdded(nid1));
+    }
+}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index 1dcff08..904fdff 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -29,6 +29,9 @@
 import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterMetadata;
 import org.onosproject.cluster.ClusterMetadataAdminService;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ClusterStore;
@@ -50,6 +53,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -83,18 +87,21 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected SystemService systemService;
 
+    private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
+    private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+
     @Activate
     public void activate() {
         store.setDelegate(delegate);
         eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
-        clusterMetadataService.getClusterMetadata()
-                              .getNodes()
-                              .forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort()));
+        clusterMetadataService.addListener(metadataListener);
+        processMetadata(clusterMetadataService.getClusterMetadata());
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        clusterMetadataService.removeListener(metadataListener);
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(ClusterEvent.class);
         log.info("Stopped");
@@ -190,4 +197,25 @@
         }
         return partitions;
     }
+
+    /**
+     * Processes metadata by adding and removing nodes from the cluster.
+     */
+    private synchronized void processMetadata(ClusterMetadata metadata) {
+        try {
+            ClusterMetadataDiff examiner =
+                    new ClusterMetadataDiff(currentMetadata.get(), metadata);
+            examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
+            examiner.nodesRemoved().forEach(this::removeNode);
+        } finally {
+            currentMetadata.set(metadata);
+        }
+    }
+
+    private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+        @Override
+        public void event(ClusterMetadataEvent event) {
+            processMetadata(event.subject());
+        }
+    }
 }