blob: b2c5ade724fc855bcbd0fd1965c81a8eb8066b83 [file] [log] [blame]
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.mastership.impl;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.putIfAbsent;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipStore;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.SMap;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onlab.util.KryoNamespace;
import com.google.common.base.Objects;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.MapEvent;
import static org.onosproject.net.MastershipRole.*;
/**
* Distributed implementation of the mastership store. The store is
* responsible for the master selection process.
*/
@Component(immediate = true, enabled = false)
@Service
public class DistributedMastershipStore
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
//term number representing that master has never been chosen yet
private static final Integer NOTHING = 0;
//initial term/TTL value
private static final Integer INIT = 1;
//device to node roles
private static final String NODE_ROLES_MAP_NAME = "nodeRoles";
protected SMap<DeviceId, RoleValue> roleMap;
//devices to terms
private static final String TERMS_MAP_NAME = "terms";
protected SMap<DeviceId, Integer> terms;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private String listenerId;
@Override
@Activate
public void activate() {
super.activate();
this.serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(new RoleValueSerializer(), RoleValue.class)
.build();
}
};
final Config config = theInstance.getConfig();
MapConfig nodeRolesCfg = config.getMapConfig(NODE_ROLES_MAP_NAME);
nodeRolesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - nodeRolesCfg.getBackupCount());
MapConfig termsCfg = config.getMapConfig(TERMS_MAP_NAME);
termsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - termsCfg.getBackupCount());
roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap(NODE_ROLES_MAP_NAME), this.serializer);
listenerId = roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
terms = new SMap<>(theInstance.<byte[], byte[]>getMap(TERMS_MAP_NAME), this.serializer);
log.info("Started");
}
@Deactivate
public void deactivate() {
roleMap.removeEntryListener(listenerId);
log.info("Stopped");
}
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
final RoleValue roleInfo = roleMap.get(deviceId);
if (roleInfo != null) {
return roleInfo.getRole(nodeId);
}
return NONE;
}
@Override
public CompletableFuture<MastershipEvent> setMaster(NodeId newMaster, DeviceId deviceId) {
roleMap.lock(deviceId);
try {
final RoleValue rv = getRoleValue(deviceId);
final MastershipRole currentRole = rv.getRole(newMaster);
switch (currentRole) {
case MASTER:
//reinforce mastership
// RoleInfo integrity check
boolean modified = rv.reassign(newMaster, STANDBY, NONE);
if (modified) {
roleMap.put(deviceId, rv);
// should never reach here.
log.warn("{} was in both MASTER and STANDBY for {}", newMaster, deviceId);
// trigger BACKUPS_CHANGED?
}
return CompletableFuture.completedFuture(null);
case STANDBY:
case NONE:
final NodeId currentMaster = rv.get(MASTER);
if (currentMaster != null) {
// place current master in STANDBY
rv.reassign(currentMaster, NONE, STANDBY);
rv.replace(currentMaster, newMaster, MASTER);
} else {
//no master before so just add.
rv.add(MASTER, newMaster);
}
// remove newMaster from STANDBY
rv.reassign(newMaster, STANDBY, NONE);
updateTerm(deviceId);
roleMap.put(deviceId, rv);
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
default:
log.warn("unknown Mastership Role {}", currentRole);
return CompletableFuture.completedFuture(null);
}
} finally {
roleMap.unlock(deviceId);
}
}
@Override
public NodeId getMaster(DeviceId deviceId) {
return getNode(MASTER, deviceId);
}
@Override
public RoleInfo getNodes(DeviceId deviceId) {
RoleValue rv = roleMap.get(deviceId);
if (rv != null) {
return rv.roleInfo();
} else {
return new RoleInfo();
}
}
@Override
public Set<DeviceId> getDevices(NodeId nodeId) {
Set<DeviceId> devices = new HashSet<>();
for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) {
if (nodeId.equals(el.getValue().get(MASTER))) {
devices.add(el.getKey());
}
}
return devices;
}
@Override
public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
// if no master => become master
// if there already exists a master:
// if I was the master return MASTER
// else put myself in STANDBY and return STANDBY
final NodeId local = clusterService.getLocalNode().id();
boolean modified = false;
roleMap.lock(deviceId);
try {
final RoleValue rv = getRoleValue(deviceId);
if (rv.get(MASTER) == null) {
// there's no master become one
// move out from STANDBY
rv.reassign(local, STANDBY, NONE);
rv.add(MASTER, local);
updateTerm(deviceId);
roleMap.put(deviceId, rv);
return CompletableFuture.completedFuture(MASTER);
}
final MastershipRole currentRole = rv.getRole(local);
switch (currentRole) {
case MASTER:
// RoleInfo integrity check
modified = rv.reassign(local, STANDBY, NONE);
if (modified) {
log.warn("{} was in both MASTER and STANDBY for {}", local, deviceId);
// should never reach here,
// but heal if we happened to be there
roleMap.put(deviceId, rv);
// trigger BACKUPS_CHANGED?
}
return CompletableFuture.completedFuture(currentRole);
case STANDBY:
// RoleInfo integrity check
modified = rv.reassign(local, NONE, STANDBY);
if (modified) {
log.warn("{} was in both NONE and STANDBY for {}", local, deviceId);
// should never reach here,
// but heal if we happened to be there
roleMap.put(deviceId, rv);
// trigger BACKUPS_CHANGED?
}
return CompletableFuture.completedFuture(currentRole);
case NONE:
rv.reassign(local, NONE, STANDBY);
roleMap.put(deviceId, rv);
// TODO: notifyDelegate BACKUPS_CHANGED
return CompletableFuture.completedFuture(STANDBY);
default:
log.warn("unknown Mastership Role {}", currentRole);
}
return CompletableFuture.completedFuture(currentRole);
} finally {
roleMap.unlock(deviceId);
}
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
// term information and role must be read atomically
// acquiring write lock for the device
roleMap.lock(deviceId);
try {
RoleValue rv = getRoleValue(deviceId);
final Integer term = terms.get(deviceId);
final NodeId master = rv.get(MASTER);
if (term == null) {
return MastershipTerm.of(null, NOTHING);
}
return MastershipTerm.of(master, term);
} finally {
roleMap.unlock(deviceId);
}
}
@Override
public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
// if nodeId was MASTER, rotate STANDBY
// if nodeId was STANDBY no-op
// if nodeId was NONE, add to STANDBY
roleMap.lock(deviceId);
try {
final RoleValue rv = getRoleValue(deviceId);
final MastershipRole currentRole = getRole(nodeId, deviceId);
switch (currentRole) {
case MASTER:
NodeId newMaster = reelect(nodeId, deviceId, rv);
rv.reassign(nodeId, NONE, STANDBY);
updateTerm(deviceId);
if (newMaster != null) {
roleMap.put(deviceId, rv);
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
} else {
// no master candidate
roleMap.put(deviceId, rv);
// TBD: Should there be new event type for no MASTER?
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
}
case STANDBY:
return CompletableFuture.completedFuture(null);
case NONE:
rv.reassign(nodeId, NONE, STANDBY);
roleMap.put(deviceId, rv);
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
default:
log.warn("unknown Mastership Role {}", currentRole);
}
return CompletableFuture.completedFuture(null);
} finally {
roleMap.unlock(deviceId);
}
}
@Override
public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
// relinquishRole is basically set to None
// If nodeId was master reelect next and remove nodeId
// else remove from STANDBY
roleMap.lock(deviceId);
try {
final RoleValue rv = getRoleValue(deviceId);
final MastershipRole currentRole = rv.getRole(nodeId);
switch (currentRole) {
case MASTER:
NodeId newMaster = reelect(nodeId, deviceId, rv);
if (newMaster != null) {
updateTerm(deviceId);
roleMap.put(deviceId, rv);
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
} else {
// No master candidate - no more backups, device is likely
// fully disconnected
roleMap.put(deviceId, rv);
// Should there be new event type?
return CompletableFuture.completedFuture(null);
}
case STANDBY:
//fall through to reinforce relinquishment
case NONE:
boolean modified = rv.reassign(nodeId, STANDBY, NONE);
if (modified) {
roleMap.put(deviceId, rv);
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
}
return CompletableFuture.completedFuture(null);
default:
log.warn("unknown Mastership Role {}", currentRole);
}
return CompletableFuture.completedFuture(null);
} finally {
roleMap.unlock(deviceId);
}
}
@Override
public void relinquishAllRole(NodeId nodeId) {
List<MastershipEvent> events = new ArrayList<>();
for (Entry<DeviceId, RoleValue> entry : roleMap.entrySet()) {
final DeviceId deviceId = entry.getKey();
final RoleValue roleValue = entry.getValue();
if (roleValue.contains(MASTER, nodeId) ||
roleValue.contains(STANDBY, nodeId)) {
relinquishRole(nodeId, deviceId).whenComplete((event, error) -> {
if (event != null) {
events.add(event);
}
});
}
}
notifyDelegate(events);
}
// TODO: Consider moving this to RoleValue method
//helper to fetch a new master candidate for a given device.
private NodeId reelect(
NodeId current, DeviceId deviceId, RoleValue rv) {
//if this is an queue it'd be neater.
NodeId candidate = null;
for (NodeId n : rv.nodesOfRole(STANDBY)) {
if (!current.equals(n)) {
candidate = n;
break;
}
}
if (candidate == null) {
log.info("{} giving up and going to NONE for {}", current, deviceId);
rv.remove(MASTER, current);
// master did change, but there is no master candidate.
return null;
} else {
log.info("{} trying to pass mastership for {} to {}", current, deviceId, candidate);
rv.replace(current, candidate, MASTER);
rv.reassign(candidate, STANDBY, NONE);
return candidate;
}
}
//return the RoleValue structure for a device, or create one
private RoleValue getRoleValue(DeviceId deviceId) {
RoleValue value = roleMap.get(deviceId);
if (value == null) {
value = new RoleValue();
RoleValue concurrentlyAdded = roleMap.putIfAbsent(deviceId, value);
if (concurrentlyAdded != null) {
return concurrentlyAdded;
}
}
return value;
}
//get first applicable node out of store-unique structure.
private NodeId getNode(MastershipRole role, DeviceId deviceId) {
RoleValue value = roleMap.get(deviceId);
if (value != null) {
return value.get(role);
}
return null;
}
//adds or updates term information.
// must be guarded by roleMap.lock(deviceId)
private void updateTerm(DeviceId deviceId) {
Integer term = terms.get(deviceId);
if (term == null) {
term = terms.putIfAbsent(deviceId, INIT);
if (term == null) {
// initial term set successfully
return;
}
// concurrent initialization detected,
// fall through to try incrementing
}
Integer nextTerm = term + 1;
boolean success = terms.replace(deviceId, term, nextTerm);
while (!success) {
term = terms.get(deviceId);
if (term == null) {
// something is very wrong, but write something to avoid
// infinite loop.
log.warn("Term info for {} disappeared.", deviceId);
term = putIfAbsent(terms, deviceId, nextTerm);
}
nextTerm = term + 1;
success = terms.replace(deviceId, term, nextTerm);
}
}
private class RemoteMasterShipEventHandler implements EntryListener<DeviceId, RoleValue> {
@Override
public void entryAdded(EntryEvent<DeviceId, RoleValue> event) {
entryUpdated(event);
}
@Override
public void entryRemoved(EntryEvent<DeviceId, RoleValue> event) {
}
@Override
public void entryUpdated(EntryEvent<DeviceId, RoleValue> event) {
// compare old and current RoleValues. If master is different,
// emit MASTER_CHANGED. else, emit BACKUPS_CHANGED.
RoleValue oldValue = event.getOldValue();
RoleValue newValue = event.getValue();
// There will be no oldValue at the very first instance of an EntryEvent.
// Technically, the progression is: null event -> null master -> some master;
// We say a null master and a null oldValue are the same condition.
NodeId oldMaster = null;
if (oldValue != null) {
oldMaster = oldValue.get(MASTER);
}
NodeId newMaster = newValue.get(MASTER);
if (!Objects.equal(oldMaster, newMaster)) {
notifyDelegate(new MastershipEvent(
MASTER_CHANGED, event.getKey(), event.getValue().roleInfo()));
} else {
notifyDelegate(new MastershipEvent(
BACKUPS_CHANGED, event.getKey(), event.getValue().roleInfo()));
}
}
@Override
public void entryEvicted(EntryEvent<DeviceId, RoleValue> event) {
}
@Override
public void mapEvicted(MapEvent event) {
}
@Override
public void mapCleared(MapEvent event) {
}
}
}