ClusterManager support for reacting to cluster metadata changes

Change-Id: I7befaf4f955bda093d89c3c431eae6814409ae03
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index 1dcff08..904fdff 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -29,6 +29,9 @@
 import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterMetadata;
 import org.onosproject.cluster.ClusterMetadataAdminService;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ClusterStore;
@@ -50,6 +53,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -83,18 +87,21 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected SystemService systemService;
 
+    private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
+    private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+
     @Activate
     public void activate() {
         store.setDelegate(delegate);
         eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
-        clusterMetadataService.getClusterMetadata()
-                              .getNodes()
-                              .forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort()));
+        clusterMetadataService.addListener(metadataListener);
+        processMetadata(clusterMetadataService.getClusterMetadata());
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        clusterMetadataService.removeListener(metadataListener);
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(ClusterEvent.class);
         log.info("Stopped");
@@ -190,4 +197,25 @@
         }
         return partitions;
     }
+
+    /**
+     * Processes metadata by adding and removing nodes from the cluster.
+     */
+    private synchronized void processMetadata(ClusterMetadata metadata) {
+        try {
+            ClusterMetadataDiff examiner =
+                    new ClusterMetadataDiff(currentMetadata.get(), metadata);
+            examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
+            examiner.nodesRemoved().forEach(this::removeNode);
+        } finally {
+            currentMetadata.set(metadata);
+        }
+    }
+
+    private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+        @Override
+        public void event(ClusterMetadataEvent event) {
+            processMetadata(event.subject());
+        }
+    }
 }