[ONOS-7081] Modify EventuallyConsistentMap to bootstrap from old nodes during upgrade
Change-Id: I0bb60bf64319e083117b6d0f6f82842640e5185f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index b73dd61..62a29bf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,13 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -30,6 +23,10 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.Member;
+import org.onosproject.cluster.MembershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
@@ -65,6 +62,13 @@
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
import static org.slf4j.LoggerFactory.getLogger;
@@ -95,6 +99,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PartitionAdminService partitionAdminService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MembershipService membershipService;
+
private final Supplier<TransactionId> transactionIdGenerator =
() -> TransactionId.from(UUID.randomUUID().toString());
private DistributedPrimitiveCreator federatedPrimitiveCreator;
@@ -119,9 +126,40 @@
@Override
public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
checkPermission(STORAGE_WRITE);
- return new EventuallyConsistentMapBuilderImpl<>(clusterService,
+ final NodeId localNodeId = clusterService.getLocalNode().id();
+
+ Supplier<List<NodeId>> peersSupplier = () -> membershipService.getMembers().stream()
+ .map(Member::nodeId)
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+
+ Supplier<List<NodeId>> bootstrapPeersSupplier = () -> {
+ if (membershipService.getMembers().size() == 1) {
+ return clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .filter(id -> !localNodeId.equals(id))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+ } else {
+ return membershipService.getMembers()
+ .stream()
+ .map(Member::nodeId)
+ .filter(id -> !localNodeId.equals(id))
+ .filter(id -> clusterService.getState(id).isActive())
+ .collect(Collectors.toList());
+ }
+ };
+
+
+ return new EventuallyConsistentMapBuilderImpl<>(
+ localNodeId,
clusterCommunicator,
- persistenceService);
+ persistenceService,
+ peersSupplier,
+ bootstrapPeersSupplier
+ );
}
@Override