Fix StoragePartition to return a furture for opening partition client + Fixes in AtomixLeaderElector
Change-Id: I6adf91e84cc17aec8acc895884dc8fbe75037978
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 3b142a7..66013c0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -50,6 +50,7 @@
private final MessagingService messagingService;
private final ClusterService clusterService;
private final File logFolder;
+ private CompletableFuture<StoragePartitionServer> serverOpenFuture;
private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
new ResourceType(DistributedLong.class),
new ResourceType(AtomixLeaderElector.class),
@@ -90,9 +91,9 @@
@Override
public CompletableFuture<Void> open() {
- return openServer().thenAccept(s -> server = Optional.ofNullable(s))
- .thenCompose(v-> openClient())
- .thenAccept(v -> isOpened.set(true))
+ serverOpenFuture = openServer();
+ serverOpenFuture.thenAccept(s -> server = Optional.ofNullable(s));
+ return openClient().thenAccept(v -> isOpened.set(true))
.thenApply(v -> null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index b8c5afa..ce8c582 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -44,7 +44,7 @@
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
- public static final String CHANGE_SUBJECT = "changeEvents";
+ public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
private Listener<Change<Leadership>> listener;
public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
index 900fef2..ea1b3ce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -298,6 +298,11 @@
private String topic;
private NodeId nodeId;
+ ElectionChangeCommand() {
+ topic = null;
+ nodeId = null;
+ }
+
public ElectionChangeCommand(String topic, NodeId nodeId) {
this.topic = topic;
this.nodeId = nodeId;
@@ -347,6 +352,10 @@
*/
@SuppressWarnings("serial")
public static class Anoint extends ElectionChangeCommand<Boolean> {
+
+ private Anoint() {
+ }
+
public Anoint(String topic, NodeId nodeId) {
super(topic, nodeId);
}
@@ -357,6 +366,10 @@
*/
@SuppressWarnings("serial")
public static class Promote extends ElectionChangeCommand<Boolean> {
+
+ private Promote() {
+ }
+
public Promote(String topic, NodeId nodeId) {
super(topic, nodeId);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index 0621042..d09092a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -202,9 +202,10 @@
public boolean anoint(Commit<? extends Anoint> commit) {
try {
String topic = commit.operation().topic();
+ NodeId nodeId = commit.operation().nodeId();
Leadership oldLeadership = leadership(topic);
ElectionState electionState = elections.computeIfPresent(topic,
- (k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic)));
+ (k, v) -> v.transferLeadership(nodeId, termCounter(topic)));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
@@ -230,7 +231,7 @@
if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
return false;
}
- elections.computeIfPresent(topic, (k, v) -> new ElectionState(v).promote(commit.operation().nodeId()));
+ elections.computeIfPresent(topic, (k, v) -> v.promote(nodeId));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
@@ -498,7 +499,7 @@
.filter(r -> r.nodeId().equals(nodeId))
.findFirst()
.orElse(null);
- List<Registration> updatedRegistrations = Lists.newLinkedList();
+ List<Registration> updatedRegistrations = Lists.newArrayList();
updatedRegistrations.add(registration);
registrations.stream()
.filter(r -> !r.nodeId().equals(nodeId))