blob: c6647b71dc68f749145cd26f3e26c7beeea34286 [file] [log] [blame]
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipStore;
import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
*/
@Service
@Component(immediate = true, enabled = true)
public class DistributedLeadershipStore
extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
implements LeadershipStore {
private static final Logger log = getLogger(DistributedLeadershipStore.class);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
protected NodeId localNodeId;
protected ConsistentMap<String, InternalLeadership> leadershipMap;
private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
event -> {
Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
boolean leaderChanged =
!Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
boolean candidatesChanged =
!Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
ImmutableSet.<NodeId>of() : oldValue.candidates()),
Sets.newHashSet(newValue.candidates())).isEmpty();
LeadershipEvent.Type eventType = null;
if (leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
}
if (leaderChanged && !candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_CHANGED;
}
if (!leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
notifyDelegate(new LeadershipEvent(eventType, newValue));
};
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
.withName("onos-leadership")
.withPartitionsDisabled()
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
.build();
leadershipMap.addListener(leadershipChangeListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
leadershipMap.removeListener(leadershipChangeListener);
log.info("Stopped");
}
@Override
public Leadership addRegistration(String topic) {
Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
v -> v == null || !v.candidates().contains(localNodeId),
(k, v) -> {
if (v == null || v.candidates().isEmpty()) {
return new InternalLeadership(topic,
localNodeId,
v == null ? 1 : v.term() + 1,
System.currentTimeMillis(),
ImmutableList.of(localNodeId));
}
List<NodeId> newCandidates = new ArrayList<>(v.candidates());
newCandidates.add(localNodeId);
return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
});
return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
}
@Override
public void removeRegistration(String topic) {
removeRegistration(topic, localNodeId);
}
private void removeRegistration(String topic, NodeId nodeId) {
leadershipMap.computeIf(topic,
v -> v != null && v.candidates().contains(nodeId),
(k, v) -> {
List<NodeId> newCandidates = v.candidates()
.stream()
.filter(id -> !nodeId.equals(id))
.collect(Collectors.toList());
NodeId newLeader = nodeId.equals(v.leader()) ?
newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
v.term() : v.term() + 1;
long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
v.termStartTime() : System.currentTimeMillis();
return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
});
}
@Override
public void removeRegistration(NodeId nodeId) {
leadershipMap.entrySet()
.stream()
.filter(e -> e.getValue().value().candidates().contains(nodeId))
.map(e -> e.getKey())
.forEach(topic -> this.removeRegistration(topic, nodeId));
}
@Override
public boolean moveLeadership(String topic, NodeId toNodeId) {
Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
v -> v != null &&
v.candidates().contains(toNodeId) &&
!Objects.equal(v.leader(), toNodeId),
(k, v) -> {
List<NodeId> newCandidates = new ArrayList<>();
newCandidates.add(toNodeId);
newCandidates.addAll(v.candidates()
.stream()
.filter(id -> !toNodeId.equals(id))
.collect(Collectors.toList()));
return new InternalLeadership(topic,
toNodeId,
v.term() + 1,
System.currentTimeMillis(),
newCandidates);
});
return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
}
@Override
public boolean makeTopCandidate(String topic, NodeId nodeId) {
Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
v -> v != null &&
v.candidates().contains(nodeId) &&
!v.candidates().get(0).equals(nodeId),
(k, v) -> {
List<NodeId> newCandidates = new ArrayList<>();
newCandidates.add(nodeId);
newCandidates.addAll(v.candidates()
.stream()
.filter(id -> !nodeId.equals(id))
.collect(Collectors.toList()));
return new InternalLeadership(topic,
v.leader(),
v.term(),
System.currentTimeMillis(),
newCandidates);
});
return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
}
@Override
public Leadership getLeadership(String topic) {
return InternalLeadership.toLeadership(Versioned.valueOrNull(leadershipMap.get(topic)));
}
@Override
public Map<String, Leadership> getLeaderships() {
Map<String, Leadership> leaderships = Maps.newHashMap();
leadershipMap.entrySet().forEach(e -> {
leaderships.put(e.getKey(), e.getValue().value().asLeadership());
});
return ImmutableMap.copyOf(leaderships);
}
private static class InternalLeadership {
private final String topic;
private final NodeId leader;
private final long term;
private final long termStartTime;
private final List<NodeId> candidates;
public InternalLeadership(String topic,
NodeId leader,
long term,
long termStartTime,
List<NodeId> candidates) {
this.topic = topic;
this.leader = leader;
this.term = term;
this.termStartTime = termStartTime;
this.candidates = ImmutableList.copyOf(candidates);
}
public NodeId leader() {
return this.leader;
}
public long term() {
return term;
}
public long termStartTime() {
return termStartTime;
}
public List<NodeId> candidates() {
return candidates;
}
public Leadership asLeadership() {
return new Leadership(topic, leader == null ?
null : new Leader(leader, term, termStartTime), candidates);
}
public static Leadership toLeadership(InternalLeadership internalLeadership) {
return internalLeadership == null ? null : internalLeadership.asLeadership();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("leader", leader)
.add("term", term)
.add("termStartTime", termStartTime)
.add("candidates", candidates)
.toString();
}
}
}