ConsistentDeviceMastership on top of LeadershipService, and leaders CLI command
modified to filter on topic. This does not support changing candidate ordering
(yet).
Refernce: ONOS-76
Change-Id: I028a6df0acbe3c4e4ff7c228f687f640e48e13be
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 303129d..ed610ad 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -362,8 +362,8 @@
Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
final MutableBoolean updated = new MutableBoolean(false);
candidateBoard.compute(path, (k, current) -> {
- if (current != null && current.epoch() == newInfo.epoch()) {
- log.info("updating candidateboard with {}", newInfo);
+ if (current != null && current.epoch() <= newInfo.epoch()) {
+ log.info("updating candidateboard with removal of {}", newInfo);
updated.setTrue();
if (candidates.isEmpty()) {
return null;
@@ -452,7 +452,7 @@
});
} else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
leaderBoard.compute(topic, (k, currentLeadership) -> {
- if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+ if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) {
updateAccepted.setTrue();
return null;
}
@@ -462,6 +462,9 @@
candidateBoard.compute(topic, (k, currentInfo) -> {
if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
updateAccepted.setTrue();
+ if (leadershipUpdate.candidates().isEmpty()) {
+ return null;
+ }
return leadershipUpdate;
}
return currentInfo;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 8a54502..27a33b3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -16,6 +16,7 @@
package org.onosproject.store.mastership.impl;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onlab.util.Tools.futureGetOrElse;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
@@ -25,12 +26,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -182,12 +179,12 @@
return MastershipRole.NONE;
}
}
- MastershipRole role = complete(clusterCommunicator.sendAndReceive(
+ MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
deviceId,
ROLE_QUERY_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
- nodeId));
+ nodeId), null);
return role == null ? MastershipRole.NONE : role;
}
@@ -270,12 +267,12 @@
if (!nodeId.equals(localNodeId)) {
log.debug("Forwarding request to relinquish "
+ "role for device {} to {}", deviceId, nodeId);
- return complete(clusterCommunicator.sendAndReceive(
+ return futureGetOrElse(clusterCommunicator.sendAndReceive(
deviceId,
ROLE_RELINQUISH_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
- nodeId));
+ nodeId), null);
}
// Check if this node is can be managed by this node.
@@ -374,16 +371,4 @@
return m.matches();
}
- private <T> T complete(Future<byte[]> future) {
- try {
- return SERIALIZER.decode(future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("Interrupted while waiting for operation to complete.", e);
- return null;
- } catch (TimeoutException | ExecutionException e) {
- log.error("Failed remote operation", e);
- return null;
- }
- }
}
\ No newline at end of file