Create local storage for topic candidates mapping. This also includes:
- using Optional in Leadership, and some commenting.
- using MutableBooleans + compute()
part of: Device Mastership store on top of LeadershipService
Reference: ONOS-76
Conflicts:
core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
Change-Id: I7f090abb123cf23bb5126a935a6e72be00f3e3ce
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 10bf6a4..1d918c8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -576,7 +576,7 @@
}
@Override
- public Map<String, List<NodeId>> getCandidates() {
+ public Map<String, Leadership> getCandidates() {
return null;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index cf3700b..303129d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -12,6 +12,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
@@ -88,6 +89,7 @@
private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
listenerRegistry;
private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
+ private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
private NodeId localNodeId;
private Set<String> activeTopics = Sets.newConcurrentHashSet();
@@ -164,16 +166,14 @@
}
@Override
- public Map<String, List<NodeId>> getCandidates() {
- Map<String, List<NodeId>> candidates = Maps.newHashMap();
- candidateMap.entrySet().forEach(el -> candidates.put(el.getKey(), el.getValue().value()));
- return ImmutableMap.copyOf(candidates);
+ public Map<String, Leadership> getCandidates() {
+ return ImmutableMap.copyOf(candidateBoard);
}
@Override
public List<NodeId> getCandidates(String path) {
- Versioned<List<NodeId>> candidates = candidateMap.get(path);
- return candidates == null ? ImmutableList.of() : ImmutableList.copyOf(candidates.value());
+ Leadership current = candidateBoard.get(path);
+ return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
}
@Override
@@ -207,13 +207,21 @@
List<NodeId> candidateList = Lists.newArrayList(candidates.value());
if (!candidateList.contains(localNodeId)) {
candidateList.add(localNodeId);
- if (!candidateMap.replace(path, candidates.version(), candidateList)) {
+ if (candidateMap.replace(path, candidates.version(), candidateList)) {
+ Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+ notifyCandidateAdded(
+ path, candidateList, newCandidates.version(), newCandidates.creationTime());
+ } else {
rerunForLeadership(path);
return;
}
}
} else {
- if (!(candidateMap.putIfAbsent(path, ImmutableList.of(localNodeId)) == null)) {
+ List<NodeId> candidateList = ImmutableList.of(localNodeId);
+ if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
+ Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+ notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
+ } else {
rerunForLeadership(path);
return;
}
@@ -247,10 +255,19 @@
if (!candidateList.remove(localNodeId)) {
return;
}
- boolean success = candidateList.isEmpty()
- ? candidateMap.remove(path, candidates.version())
- : candidateMap.replace(path, candidates.version(), candidateList);
- if (!success) {
+ boolean success = false;
+ if (candidateList.isEmpty()) {
+ if (candidateMap.remove(path, candidates.version())) {
+ success = true;
+ }
+ } else {
+ if (candidateMap.replace(path, candidates.version(), candidateList)) {
+ success = true;
+ }
+ }
+ if (success) {
+ notifyCandidateRemoved(path, candidateList, candidates.version(), candidates.creationTime());
+ } else {
log.warn("Failed to withdraw from candidates list. Will retry");
retryWithdraw(path);
}
@@ -321,21 +338,63 @@
}
}
+ private void notifyCandidateAdded(
+ String path, List<NodeId> candidates, long epoch, long electedTime) {
+ Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
+ final MutableBoolean updated = new MutableBoolean(false);
+ candidateBoard.compute(path, (k, current) -> {
+ if (current == null || current.epoch() < newInfo.epoch()) {
+ log.info("updating candidateboard with {}", newInfo);
+ updated.setTrue();
+ return newInfo;
+ }
+ return current;
+ });
+ // maybe rethink types of candidates events
+ if (updated.booleanValue()) {
+ LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
+ notifyPeers(event);
+ }
+ }
+
+ private void notifyCandidateRemoved(
+ String path, List<NodeId> candidates, long epoch, long electedTime) {
+ Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
+ final MutableBoolean updated = new MutableBoolean(false);
+ candidateBoard.compute(path, (k, current) -> {
+ if (current != null && current.epoch() == newInfo.epoch()) {
+ log.info("updating candidateboard with {}", newInfo);
+ updated.setTrue();
+ if (candidates.isEmpty()) {
+ return null;
+ } else {
+ return newInfo;
+ }
+ }
+ return current;
+ });
+ // maybe rethink types of candidates events
+ if (updated.booleanValue()) {
+ LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
+ notifyPeers(event);
+ }
+ }
+
private void notifyNewLeader(String path, NodeId leader,
List<NodeId> candidates, long epoch, long electedTime) {
Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
- boolean updatedLeader = false;
+ final MutableBoolean updatedLeader = new MutableBoolean(false);
log.debug("candidates for new Leadership {}", candidates);
- synchronized (leaderBoard) {
- Leadership currentLeader = leaderBoard.get(path);
+ leaderBoard.compute(path, (k, currentLeader) -> {
if (currentLeader == null || currentLeader.epoch() < epoch) {
log.debug("updating leaderboard with new {}", newLeadership);
- leaderBoard.put(path, newLeadership);
- updatedLeader = true;
+ updatedLeader.setTrue();
+ return newLeadership;
}
- }
+ return currentLeader;
+ });
- if (updatedLeader) {
+ if (updatedLeader.booleanValue()) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
notifyPeers(event);
}
@@ -352,21 +411,18 @@
Versioned<List<NodeId>> candidates = candidateMap.get(path);
Leadership oldLeadership = new Leadership(
path, leader, candidates.value(), epoch, electedTime);
- boolean updatedLeader = false;
- synchronized (leaderBoard) {
- Leadership currentLeader = leaderBoard.get(path);
+ final MutableBoolean updatedLeader = new MutableBoolean(false);
+ leaderBoard.compute(path, (k, currentLeader) -> {
if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
- leaderBoard.remove(path);
- updatedLeader = true;
+ updatedLeader.setTrue();
+ return null;
}
- }
+ return currentLeader;
+ });
- if (updatedLeader) {
+ if (updatedLeader.booleanValue()) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
- eventDispatcher.post(event);
- clusterCommunicator.broadcast(event,
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER::encode);
+ notifyPeers(event);
}
}
@@ -385,31 +441,37 @@
LeadershipEvent.Type eventType = leadershipEvent.type();
String topic = leadershipUpdate.topic();
- boolean updateAccepted = false;
-
- synchronized (leaderBoard) {
- Leadership currentLeadership = leaderBoard.get(topic);
- if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+ MutableBoolean updateAccepted = new MutableBoolean(false);
+ if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+ leaderBoard.compute(topic, (k, currentLeadership) -> {
if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
- leaderBoard.put(topic, leadershipUpdate);
- updateAccepted = true;
+ updateAccepted.setTrue();
+ return leadershipUpdate;
}
- } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
- if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
- leaderBoard.remove(topic);
- updateAccepted = true;
+ return currentLeadership;
+ });
+ } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
+ leaderBoard.compute(topic, (k, currentLeadership) -> {
+ if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+ updateAccepted.setTrue();
+ return null;
}
- } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
- if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
- leaderBoard.replace(topic, leadershipUpdate);
- updateAccepted = true;
+ return currentLeadership;
+ });
+ } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
+ candidateBoard.compute(topic, (k, currentInfo) -> {
+ if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
+ updateAccepted.setTrue();
+ return leadershipUpdate;
}
- } else {
- throw new IllegalStateException("Unknown event type.");
- }
- if (updateAccepted) {
- eventDispatcher.post(leadershipEvent);
- }
+ return currentInfo;
+ });
+ } else {
+ throw new IllegalStateException("Unknown event type.");
+ }
+
+ if (updateAccepted.booleanValue()) {
+ eventDispatcher.post(leadershipEvent);
}
}
}
@@ -470,6 +532,12 @@
SERIALIZER::encode);
}
});
+ candidateBoard.forEach((path, leadership) -> {
+ LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
+ clusterCommunicator.broadcast(event,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
+ });
} catch (Exception e) {
log.debug("Failed to send leadership updates", e);
}