PartitionManager support for reacting to cluster metadata changes

Change-Id: I65e358f5cb47e9420fae9589661ba0ce45f58df6
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/PartitionAdminService.java b/core/api/src/main/java/org/onosproject/store/primitives/PartitionAdminService.java
index 04d8e99..241c384 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/PartitionAdminService.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/PartitionAdminService.java
@@ -16,9 +16,7 @@
 package org.onosproject.store.primitives;
 
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 
-import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.service.PartitionInfo;
 
 /**
@@ -31,20 +29,4 @@
      * @return list of {@code PartitionInfo}
      */
     List<PartitionInfo> partitionInfo();
-
-    /**
-     * Leaves a partition.
-     *
-     * @param partitionId partition identifier
-     * @return future that is completed when the operation completes.
-     */
-    CompletableFuture<Void> leave(PartitionId partitionId);
-
-    /**
-     * Joins a partition.
-     *
-     * @param partitionId partition identifier
-     * @return future that is completed when the operation completes.
-     */
-    CompletableFuture<Void> join(PartitionId partitionId);
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 45bd171..a083a8b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -32,9 +33,14 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataDiff;
+import org.onosproject.cluster.ClusterMetadataEvent;
+import org.onosproject.cluster.ClusterMetadataEventListener;
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.PartitionDiff;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.store.cluster.messaging.MessagingService;
@@ -68,15 +74,19 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+    private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
+    private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
+    private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
 
     @Activate
     public void activate() {
         eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
-
-        metadataService.getClusterMetadata()
+        currentClusterMetadata.set(metadataService.getClusterMetadata());
+        metadataService.addListener(metadataListener);
+        currentClusterMetadata.get()
                        .getPartitions()
                        .stream()
+                       .filter(partition -> !partition.getId().equals(PartitionId.from(0))) // exclude p0
                        .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
                                messagingService,
                                clusterService,
@@ -93,6 +103,7 @@
 
     @Deactivate
     public void deactivate() {
+        metadataService.removeListener(metadataListener);
         eventDispatcher.removeSink(PartitionEvent.class);
 
         CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
@@ -104,20 +115,6 @@
     }
 
     @Override
-    public CompletableFuture<Void> leave(PartitionId partitionId) {
-        return partitions.get(partitionId)
-                         .server()
-                         .map(server -> server.close())
-                         .orElse(CompletableFuture.completedFuture(null));
-    }
-
-    @Override
-    public CompletableFuture<Void> join(PartitionId partitionId) {
-        return partitions.get(partitionId)
-                         .open();
-    }
-
-    @Override
     public int getNumberOfPartitions() {
         return partitions.size();
     }
@@ -152,4 +149,23 @@
                          .flatMap(x -> Tools.stream(x.info()))
                          .collect(Collectors.toList());
     }
+
+    private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
+        ClusterMetadataDiff diffExaminer =
+                new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
+        diffExaminer.partitionDiffs()
+                    .values()
+                    .stream()
+                    // TODO: Remove after partition 0 is removed from cluster metadata.
+                    .filter(diff -> !diff.partitionId().equals(PartitionId.from(0)))
+                    .filter(PartitionDiff::hasChanged)
+                    .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
+    }
+
+    private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
+        @Override
+        public void event(ClusterMetadataEvent event) {
+            processMetadataUpdate(event.subject());
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 66013c0..5cdc5a9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -28,9 +28,9 @@
 
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultPartition;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.Partition;
+import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.cluster.messaging.MessagingService;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
@@ -42,7 +42,7 @@
 /**
  * Storage partition.
  */
-public class StoragePartition extends DefaultPartition implements Managed<StoragePartition> {
+public class StoragePartition implements Managed<StoragePartition> {
 
     private final AtomicBoolean isOpened = new AtomicBoolean(false);
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
@@ -50,14 +50,14 @@
     private final MessagingService messagingService;
     private final ClusterService clusterService;
     private final File logFolder;
-    private CompletableFuture<StoragePartitionServer> serverOpenFuture;
+    private Partition partition;
     private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
                                                         new ResourceType(DistributedLong.class),
                                                         new ResourceType(AtomixLeaderElector.class),
                                                         new ResourceType(AtomixConsistentMap.class));
 
     private NodeId localNodeId;
-    private Optional<StoragePartitionServer> server = Optional.empty();
+    private StoragePartitionServer server;
     private StoragePartitionClient client;
 
     public StoragePartition(Partition partition,
@@ -65,7 +65,7 @@
             ClusterService clusterService,
             Serializer serializer,
             File logFolder) {
-        super(partition);
+        this.partition = partition;
         this.messagingService = messagingService;
         this.clusterService = clusterService;
         this.localNodeId = clusterService.getLocalNode().id();
@@ -81,61 +81,86 @@
         return client;
     }
 
-    /**
-     * Returns the optional server instance.
-     * @return server
-     */
-    public Optional<StoragePartitionServer> server() {
-        return server;
-    }
-
     @Override
     public CompletableFuture<Void> open() {
-        serverOpenFuture = openServer();
-        serverOpenFuture.thenAccept(s -> server = Optional.ofNullable(s));
+        openServer();
         return openClient().thenAccept(v -> isOpened.set(true))
                            .thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Void> close() {
-        return closeClient().thenCompose(v -> closeServer())
-                            .thenAccept(v -> isClosed.set(true))
+        // We do not explicitly close the server and instead let the cluster
+        // deal with this as an unclean exit.
+        return closeClient().thenAccept(v -> isClosed.set(true))
                             .thenApply(v -> null);
     }
 
-    public Collection<Address> getMemberAddresses() {
-        return Collections2.transform(getMembers(), this::toAddress);
+    /**
+     * Returns the identifier of the {@link Partition partition} associated with this instance.
+     * @return partition identifier
+     */
+    public PartitionId getId() {
+        return partition.getId();
     }
 
-    private CompletableFuture<StoragePartitionServer> openServer() {
-        if (!getMembers().contains(localNodeId)) {
+    /**
+     * Returns the identifiers of partition members.
+     * @return partition member instance ids
+     */
+    public Collection<NodeId> getMembers() {
+        return partition.getMembers();
+    }
+
+    /**
+     * Returns the {@link Address addresses} of partition members.
+     * @return partition member addresses
+     */
+    public Collection<Address> getMemberAddresses() {
+        return Collections2.transform(partition.getMembers(), this::toAddress);
+    }
+
+    private CompletableFuture<Void> openServer() {
+        if (!partition.getMembers().contains(localNodeId) || server != null) {
             return CompletableFuture.completedFuture(null);
         }
         StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
                 this,
                 serializer,
                 () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
-                                     getId(),
+                                     partition.getId(),
                                      messagingService),
                 RESOURCE_TYPES,
                 logFolder);
-        return server.open().thenApply(v -> server);
+        return server.open().thenRun(() -> this.server = server);
     }
 
     private CompletableFuture<StoragePartitionClient> openClient() {
         client = new StoragePartitionClient(this,
                 serializer,
                 new CopycatTransport(CopycatTransport.Mode.CLIENT,
-                                     getId(),
+                                     partition.getId(),
                                      messagingService),
                 RESOURCE_TYPES);
         return client.open().thenApply(v -> client);
     }
 
-    private CompletableFuture<Void> closeServer() {
-        return server.map(StoragePartitionServer::close)
-                .orElse(CompletableFuture.completedFuture(null));
+    /**
+     * Closes the partition server if it was previously opened.
+     * @return future that is completed when the operation completes
+     */
+    public CompletableFuture<Void> closeServer() {
+        return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public boolean isOpen() {
+        return isOpened.get() && !isClosed.get();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return isClosed.get();
     }
 
     private CompletableFuture<Void> closeClient() {
@@ -150,22 +175,21 @@
         return new Address(node.ip().toString(), node.tcpPort());
     }
 
-    @Override
-    public boolean isOpen() {
-        return !isClosed.get() && isOpened.get();
-    }
-
-    @Override
-    public boolean isClosed() {
-        return isOpened.get() && isClosed.get();
-    }
-
     /**
      * Returns the partition information if this partition is locally managed i.e.
      * this node is a active member of the partition.
      * @return partition info
      */
     public Optional<PartitionInfo> info() {
-        return server.map(StoragePartitionServer::info);
+        return server != null ? Optional.of(server.info()) : Optional.empty();
+    }
+
+    public void onUpdate(Partition partition) {
+        this.partition = partition;
+        if (partition.getMembers().contains(localNodeId)) {
+            openServer();
+        } else if (!partition.getMembers().contains(localNodeId)) {
+            closeServer();
+        }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index e6669dc..6d613c3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -31,16 +31,13 @@
 
 import java.io.File;
 import java.util.Collection;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
-import org.onosproject.cluster.NodeId;
 import org.onosproject.store.service.PartitionInfo;
 import org.slf4j.Logger;
 
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 
 /**
  * {@link StoragePartition} server.
@@ -80,7 +77,7 @@
                 return CompletableFuture.completedFuture(null);
             }
             synchronized (this) {
-                server = server();
+                server = buildServer();
             }
             serverOpenFuture = server.open();
         } else {
@@ -97,13 +94,22 @@
 
     @Override
     public CompletableFuture<Void> close() {
-        // We do not close the server because doing so is equivalent to this node
-        // leaving the cluster and we don't want that here.
-        // The Raft protocol should take care of servers leaving unannounced.
-        return CompletableFuture.completedFuture(null);
+        /**
+         * CopycatServer#kill just shuts down the server and does not result
+         * in any cluster membership changes.
+         */
+        return server.kill();
     }
 
-    private CopycatServer server() {
+    /**
+     * Closes the server and exits the partition.
+     * @return future that is completed when the operation is complete
+     */
+    public CompletableFuture<Void> closeAndExit() {
+        return server.close();
+    }
+
+    private CopycatServer buildServer() {
         ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
         ResourceRegistry registry = new ResourceRegistry();
         resourceTypes.forEach(registry::register);
@@ -124,10 +130,6 @@
         return server;
     }
 
-    public Set<NodeId> configuredMembers() {
-        return Sets.newHashSet(partition.getMembers());
-    }
-
     @Override
     public boolean isOpen() {
         return server.isOpen();