| package org.onlab.onos.store.mastership.impl; |
| |
| import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED; |
| |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.felix.scr.annotations.Activate; |
| import org.apache.felix.scr.annotations.Component; |
| import org.apache.felix.scr.annotations.Deactivate; |
| import org.apache.felix.scr.annotations.Reference; |
| import org.apache.felix.scr.annotations.ReferenceCardinality; |
| import org.apache.felix.scr.annotations.Service; |
| import org.onlab.onos.cluster.ClusterService; |
| import org.onlab.onos.cluster.NodeId; |
| import org.onlab.onos.cluster.RoleInfo; |
| import org.onlab.onos.mastership.MastershipEvent; |
| import org.onlab.onos.mastership.MastershipStore; |
| import org.onlab.onos.mastership.MastershipStoreDelegate; |
| import org.onlab.onos.mastership.MastershipTerm; |
| import org.onlab.onos.net.DeviceId; |
| import org.onlab.onos.net.MastershipRole; |
| import org.onlab.onos.store.common.AbstractHazelcastStore; |
| import org.onlab.onos.store.common.SMap; |
| import org.onlab.onos.store.serializers.KryoNamespaces; |
| import org.onlab.onos.store.serializers.KryoSerializer; |
| import org.onlab.util.KryoNamespace; |
| |
| import com.google.common.collect.ImmutableSet; |
| import com.hazelcast.core.EntryEvent; |
| import com.hazelcast.core.EntryListener; |
| import com.hazelcast.core.IAtomicLong; |
| import com.hazelcast.core.MapEvent; |
| |
| import static org.onlab.onos.net.MastershipRole.*; |
| |
| /** |
| * Distributed implementation of the mastership store. The store is |
| * responsible for the master selection process. |
| */ |
| @Component(immediate = true) |
| @Service |
| public class DistributedMastershipStore |
| extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate> |
| implements MastershipStore { |
| |
| //initial term/TTL value |
| private static final Integer INIT = 0; |
| |
| //device to node roles |
| protected SMap<DeviceId, RoleValue> roleMap; |
| //devices to terms |
| protected SMap<DeviceId, Integer> terms; |
| //last-known cluster size, used for tie-breaking when partitioning occurs |
| protected IAtomicLong clusterSize; |
| |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected ClusterService clusterService; |
| |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| @Override |
| @Activate |
| public void activate() { |
| super.activate(); |
| |
| this.serializer = new KryoSerializer() { |
| @Override |
| protected void setupKryoPool() { |
| serializerPool = KryoNamespace.newBuilder() |
| .register(KryoNamespaces.API) |
| |
| .register(RoleValue.class, new RoleValueSerializer()) |
| .build() |
| .populate(1); |
| } |
| }; |
| |
| roleMap = new SMap(theInstance.getMap("nodeRoles"), this.serializer); |
| roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true); |
| terms = new SMap(theInstance.getMap("terms"), this.serializer); |
| clusterSize = theInstance.getAtomicLong("clustersize"); |
| |
| log.info("Started"); |
| } |
| |
| @Deactivate |
| public void deactivate() { |
| log.info("Stopped"); |
| } |
| |
| @Override |
| public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) { |
| final RoleValue roleInfo = getRoleValue(deviceId); |
| if (roleInfo.contains(MASTER, nodeId)) { |
| return MASTER; |
| } |
| if (roleInfo.contains(STANDBY, nodeId)) { |
| return STANDBY; |
| } |
| return NONE; |
| } |
| |
| @Override |
| public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) { |
| |
| MastershipRole role = getRole(nodeId, deviceId); |
| roleMap.lock(deviceId); |
| try { |
| RoleValue rv = getRoleValue(deviceId); |
| switch (role) { |
| case MASTER: |
| //reinforce mastership |
| rv.reassign(nodeId, STANDBY, NONE); |
| roleMap.put(deviceId, rv); |
| return null; |
| case STANDBY: |
| case NONE: |
| NodeId current = rv.get(MASTER); |
| if (current != null) { |
| //backup and replace current master |
| rv.reassign(current, NONE, STANDBY); |
| rv.replace(current, nodeId, MASTER); |
| } else { |
| //no master before so just add. |
| rv.add(MASTER, nodeId); |
| } |
| rv.reassign(nodeId, STANDBY, NONE); |
| roleMap.put(deviceId, rv); |
| updateTerm(deviceId); |
| return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()); |
| default: |
| log.warn("unknown Mastership Role {}", role); |
| return null; |
| } |
| } finally { |
| roleMap.unlock(deviceId); |
| } |
| } |
| |
| @Override |
| public NodeId getMaster(DeviceId deviceId) { |
| return getNode(MASTER, deviceId); |
| } |
| |
| |
| @Override |
| public RoleInfo getNodes(DeviceId deviceId) { |
| roleMap.lock(deviceId); |
| try { |
| RoleValue rv = getRoleValue(deviceId); |
| return rv.roleInfo(); |
| } finally { |
| roleMap.unlock(deviceId); |
| } |
| } |
| |
| @Override |
| public Set<DeviceId> getDevices(NodeId nodeId) { |
| ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder(); |
| |
| for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) { |
| if (nodeId.equals(el.getValue().get(MASTER))) { |
| builder.add(el.getKey()); |
| } |
| } |
| |
| return builder.build(); |
| } |
| |
| @Override |
| public MastershipRole requestRole(DeviceId deviceId) { |
| NodeId local = clusterService.getLocalNode().id(); |
| |
| roleMap.lock(deviceId); |
| try { |
| RoleValue rv = getRoleValue(deviceId); |
| MastershipRole role = getRole(local, deviceId); |
| switch (role) { |
| case MASTER: |
| rv.reassign(local, STANDBY, NONE); |
| terms.putIfAbsent(deviceId, INIT); |
| roleMap.put(deviceId, rv); |
| break; |
| case STANDBY: |
| rv.reassign(local, NONE, STANDBY); |
| roleMap.put(deviceId, rv); |
| terms.putIfAbsent(deviceId, INIT); |
| break; |
| case NONE: |
| //either we're the first standby, or first to device. |
| //for latter, claim mastership. |
| if (rv.get(MASTER) == null) { |
| rv.add(MASTER, local); |
| rv.reassign(local, STANDBY, NONE); |
| updateTerm(deviceId); |
| role = MastershipRole.MASTER; |
| } else { |
| rv.add(STANDBY, local); |
| rv.reassign(local, NONE, STANDBY); |
| role = MastershipRole.STANDBY; |
| } |
| roleMap.put(deviceId, rv); |
| break; |
| default: |
| log.warn("unknown Mastership Role {}", role); |
| } |
| return role; |
| } finally { |
| roleMap.unlock(deviceId); |
| } |
| } |
| |
| @Override |
| public MastershipTerm getTermFor(DeviceId deviceId) { |
| RoleValue rv = getRoleValue(deviceId); |
| if ((rv.get(MASTER) == null) || (terms.get(deviceId) == null)) { |
| return null; |
| } |
| return MastershipTerm.of(rv.get(MASTER), terms.get(deviceId)); |
| } |
| |
| @Override |
| public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) { |
| MastershipEvent event = null; |
| |
| roleMap.lock(deviceId); |
| try { |
| RoleValue rv = getRoleValue(deviceId); |
| MastershipRole role = getRole(nodeId, deviceId); |
| switch (role) { |
| case MASTER: |
| event = reelect(nodeId, deviceId, rv); |
| //fall through to reinforce role |
| case STANDBY: |
| //fall through to reinforce role |
| case NONE: |
| rv.reassign(nodeId, NONE, STANDBY); |
| roleMap.put(deviceId, rv); |
| break; |
| default: |
| log.warn("unknown Mastership Role {}", role); |
| } |
| return event; |
| } finally { |
| roleMap.unlock(deviceId); |
| } |
| } |
| |
| @Override |
| public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) { |
| MastershipEvent event = null; |
| |
| roleMap.lock(deviceId); |
| try { |
| RoleValue rv = getRoleValue(deviceId); |
| MastershipRole role = getRole(nodeId, deviceId); |
| switch (role) { |
| case MASTER: |
| event = reelect(nodeId, deviceId, rv); |
| //fall through to reinforce relinquishment |
| case STANDBY: |
| //fall through to reinforce relinquishment |
| case NONE: |
| rv.reassign(nodeId, STANDBY, NONE); |
| roleMap.put(deviceId, rv); |
| break; |
| default: |
| log.warn("unknown Mastership Role {}", role); |
| } |
| return event; |
| } finally { |
| roleMap.unlock(deviceId); |
| } |
| } |
| |
| //helper to fetch a new master candidate for a given device. |
| private MastershipEvent reelect( |
| NodeId current, DeviceId deviceId, RoleValue rv) { |
| |
| //if this is an queue it'd be neater. |
| NodeId backup = null; |
| for (NodeId n : rv.nodesOfRole(STANDBY)) { |
| if (!current.equals(n)) { |
| backup = n; |
| break; |
| } |
| } |
| |
| if (backup == null) { |
| log.info("{} giving up and going to NONE for {}", current, deviceId); |
| rv.remove(MASTER, current); |
| roleMap.put(deviceId, rv); |
| return null; |
| } else { |
| log.info("{} trying to pass mastership for {} to {}", current, deviceId, backup); |
| rv.replace(current, backup, MASTER); |
| rv.reassign(backup, STANDBY, NONE); |
| roleMap.put(deviceId, rv); |
| Integer term = terms.get(deviceId); |
| terms.put(deviceId, ++term); |
| return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()); |
| } |
| } |
| |
| //return the RoleValue structure for a device, or create one |
| private RoleValue getRoleValue(DeviceId deviceId) { |
| RoleValue value = roleMap.get(deviceId); |
| if (value == null) { |
| value = new RoleValue(); |
| RoleValue concurrentlyAdded = roleMap.putIfAbsent(deviceId, value); |
| if (concurrentlyAdded != null) { |
| return concurrentlyAdded; |
| } |
| } |
| return value; |
| } |
| |
| //get first applicable node out of store-unique structure. |
| private NodeId getNode(MastershipRole role, DeviceId deviceId) { |
| RoleValue value = roleMap.get(deviceId); |
| if (value != null) { |
| return value.get(role); |
| } |
| return null; |
| } |
| |
| //adds or updates term information. |
| private void updateTerm(DeviceId deviceId) { |
| terms.lock(deviceId); |
| try { |
| Integer term = terms.get(deviceId); |
| if (term == null) { |
| terms.put(deviceId, INIT); |
| } else { |
| terms.put(deviceId, ++term); |
| } |
| } finally { |
| terms.unlock(deviceId); |
| } |
| } |
| |
| private class RemoteMasterShipEventHandler implements EntryListener<DeviceId, RoleValue> { |
| |
| @Override |
| public void entryAdded(EntryEvent<DeviceId, RoleValue> event) { |
| } |
| |
| @Override |
| public void entryRemoved(EntryEvent<DeviceId, RoleValue> event) { |
| } |
| |
| @Override |
| public void entryUpdated(EntryEvent<DeviceId, RoleValue> event) { |
| NodeId myId = clusterService.getLocalNode().id(); |
| NodeId node = event.getValue().get(MASTER); |
| if (myId.equals(node)) { |
| // XXX or do we just let it get sent and caught by ourself? |
| return; |
| } |
| notifyDelegate(new MastershipEvent( |
| MASTER_CHANGED, event.getKey(), event.getValue().roleInfo())); |
| } |
| |
| @Override |
| public void entryEvicted(EntryEvent<DeviceId, RoleValue> event) { |
| } |
| |
| @Override |
| public void mapEvicted(MapEvent event) { |
| } |
| |
| @Override |
| public void mapCleared(MapEvent event) { |
| } |
| } |
| |
| } |