PartitionManager support for reacting to cluster metadata changes

Change-Id: I65e358f5cb47e9420fae9589661ba0ce45f58df6
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 45bd171..a083a8b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -32,9 +33,14 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.PartitionDiff;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.store.cluster.messaging.MessagingService;
@@ -68,15 +74,19 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+    private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+    private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
+    private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
 
     @Activate
     public void activate() {
         eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
-
-        metadataService.getClusterMetadata()
+        currentClusterMetadata.set(metadataService.getClusterMetadata());
+        metadataService.addListener(metadataListener);
+        currentClusterMetadata.get()
                        .getPartitions()
                        .stream()
+                       .filter(partition -> !partition.getId().equals(PartitionId.from(0))) // exclude p0
                        .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
                                messagingService,
                                clusterService,
@@ -93,6 +103,7 @@
 
     @Deactivate
     public void deactivate() {
+        metadataService.removeListener(metadataListener);
         eventDispatcher.removeSink(PartitionEvent.class);
 
         CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
@@ -104,20 +115,6 @@
     }
 
     @Override
-    public CompletableFuture<Void> leave(PartitionId partitionId) {
-        return partitions.get(partitionId)
-                         .server()
-                         .map(server -> server.close())
-                         .orElse(CompletableFuture.completedFuture(null));
-    }
-
-    @Override
-    public CompletableFuture<Void> join(PartitionId partitionId) {
-        return partitions.get(partitionId)
-                         .open();
-    }
-
-    @Override
     public int getNumberOfPartitions() {
         return partitions.size();
     }
@@ -152,4 +149,23 @@
                          .flatMap(x -> Tools.stream(x.info()))
                          .collect(Collectors.toList());
     }
+
+    private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
+        ClusterMetadataDiff diffExaminer =
+                new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
+        diffExaminer.partitionDiffs()
+                    .values()
+                    .stream()
+                    // TODO: Remove after partition 0 is removed from cluster metadata.
+                    .filter(diff -> !diff.partitionId().equals(PartitionId.from(0)))
+                    .filter(PartitionDiff::hasChanged)
+                    .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
+    }
+
+    private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+        @Override
+        public void event(ClusterMetadataEvent event) {
+            processMetadataUpdate(event.subject());
+        }
+    }
 }