| /* |
| * Copyright 2014-2015 Open Networking Laboratory |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.onosproject.store.trivial; |
| |
| import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED; |
| import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED; |
| import static org.slf4j.LoggerFactory.getLogger; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.felix.scr.annotations.Activate; |
| import org.apache.felix.scr.annotations.Component; |
| import org.apache.felix.scr.annotations.Deactivate; |
| import org.apache.felix.scr.annotations.Reference; |
| import org.apache.felix.scr.annotations.ReferenceCardinality; |
| import org.apache.felix.scr.annotations.Service; |
| import org.joda.time.DateTime; |
| import org.onlab.packet.IpAddress; |
| import org.onosproject.cluster.ClusterEventListener; |
| import org.onosproject.cluster.ClusterService; |
| import org.onosproject.cluster.ControllerNode; |
| import org.onosproject.cluster.ControllerNode.State; |
| import org.onosproject.cluster.DefaultControllerNode; |
| import org.onosproject.cluster.NodeId; |
| import org.onosproject.cluster.RoleInfo; |
| import org.onosproject.mastership.MastershipEvent; |
| import org.onosproject.mastership.MastershipStore; |
| import org.onosproject.mastership.MastershipStoreDelegate; |
| import org.onosproject.mastership.MastershipTerm; |
| import org.onosproject.net.DeviceId; |
| import org.onosproject.net.MastershipRole; |
| import org.onosproject.store.AbstractStore; |
| import org.slf4j.Logger; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| |
| /** |
| * Manages inventory of controller mastership over devices using |
| * trivial, non-distributed in-memory structures implementation. |
| */ |
| @Component(immediate = true) |
| @Service |
| public class SimpleMastershipStore |
| extends AbstractStore<MastershipEvent, MastershipStoreDelegate> |
| implements MastershipStore { |
| |
| private final Logger log = getLogger(getClass()); |
| |
| private static final int NOTHING = 0; |
| private static final int INIT = 1; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected ClusterService clusterService; |
| |
| //devices mapped to their masters, to emulate multiple nodes |
| protected final Map<DeviceId, NodeId> masterMap = new HashMap<>(); |
| //emulate backups with pile of nodes |
| protected final Map<DeviceId, List<NodeId>> backups = new HashMap<>(); |
| //terms |
| protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>(); |
| |
| @Activate |
| public void activate() { |
| if (clusterService == null) { |
| // just for ease of unit test |
| final ControllerNode instance = |
| new DefaultControllerNode(new NodeId("local"), |
| IpAddress.valueOf("127.0.0.1")); |
| |
| clusterService = new ClusterService() { |
| |
| private final DateTime creationTime = DateTime.now(); |
| |
| @Override |
| public ControllerNode getLocalNode() { |
| return instance; |
| } |
| |
| @Override |
| public Set<ControllerNode> getNodes() { |
| return ImmutableSet.of(instance); |
| } |
| |
| @Override |
| public ControllerNode getNode(NodeId nodeId) { |
| if (instance.id().equals(nodeId)) { |
| return instance; |
| } |
| return null; |
| } |
| |
| @Override |
| public State getState(NodeId nodeId) { |
| if (instance.id().equals(nodeId)) { |
| return State.ACTIVE; |
| } else { |
| return State.INACTIVE; |
| } |
| } |
| |
| @Override |
| public DateTime getLastUpdated(NodeId nodeId) { |
| return creationTime; |
| } |
| |
| @Override |
| public void addListener(ClusterEventListener listener) { |
| } |
| |
| @Override |
| public void removeListener(ClusterEventListener listener) { |
| } |
| }; |
| } |
| log.info("Started"); |
| } |
| |
| @Deactivate |
| public void deactivate() { |
| log.info("Stopped"); |
| } |
| |
| @Override |
| public synchronized CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) { |
| |
| MastershipRole role = getRole(nodeId, deviceId); |
| switch (role) { |
| case MASTER: |
| // no-op |
| return CompletableFuture.completedFuture(null); |
| case STANDBY: |
| case NONE: |
| NodeId prevMaster = masterMap.put(deviceId, nodeId); |
| incrementTerm(deviceId); |
| removeFromBackups(deviceId, nodeId); |
| addToBackup(deviceId, prevMaster); |
| break; |
| default: |
| log.warn("unknown Mastership Role {}", role); |
| return null; |
| } |
| |
| return CompletableFuture.completedFuture( |
| new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); |
| } |
| |
| @Override |
| public NodeId getMaster(DeviceId deviceId) { |
| return masterMap.get(deviceId); |
| } |
| |
| // synchronized for atomic read |
| @Override |
| public synchronized RoleInfo getNodes(DeviceId deviceId) { |
| return new RoleInfo(masterMap.get(deviceId), |
| backups.getOrDefault(deviceId, ImmutableList.of())); |
| } |
| |
| @Override |
| public Set<DeviceId> getDevices(NodeId nodeId) { |
| Set<DeviceId> ids = new HashSet<>(); |
| for (Map.Entry<DeviceId, NodeId> d : masterMap.entrySet()) { |
| if (Objects.equals(d.getValue(), nodeId)) { |
| ids.add(d.getKey()); |
| } |
| } |
| return ids; |
| } |
| |
| @Override |
| public synchronized CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) { |
| //query+possible reelection |
| NodeId node = clusterService.getLocalNode().id(); |
| MastershipRole role = getRole(node, deviceId); |
| |
| switch (role) { |
| case MASTER: |
| return CompletableFuture.completedFuture(MastershipRole.MASTER); |
| case STANDBY: |
| if (getMaster(deviceId) == null) { |
| // no master => become master |
| masterMap.put(deviceId, node); |
| incrementTerm(deviceId); |
| // remove from backup list |
| removeFromBackups(deviceId, node); |
| notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, |
| getNodes(deviceId))); |
| return CompletableFuture.completedFuture(MastershipRole.MASTER); |
| } |
| return CompletableFuture.completedFuture(MastershipRole.STANDBY); |
| case NONE: |
| if (getMaster(deviceId) == null) { |
| // no master => become master |
| masterMap.put(deviceId, node); |
| incrementTerm(deviceId); |
| notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, |
| getNodes(deviceId))); |
| return CompletableFuture.completedFuture(MastershipRole.MASTER); |
| } |
| // add to backup list |
| if (addToBackup(deviceId, node)) { |
| notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, |
| getNodes(deviceId))); |
| } |
| return CompletableFuture.completedFuture(MastershipRole.STANDBY); |
| default: |
| log.warn("unknown Mastership Role {}", role); |
| } |
| return CompletableFuture.completedFuture(role); |
| } |
| |
| // add to backup if not there already, silently ignores null node |
| private synchronized boolean addToBackup(DeviceId deviceId, NodeId nodeId) { |
| boolean modified = false; |
| List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>()); |
| if (nodeId != null && !stbys.contains(nodeId)) { |
| stbys.add(nodeId); |
| modified = true; |
| } |
| backups.put(deviceId, stbys); |
| return modified; |
| } |
| |
| private synchronized boolean removeFromBackups(DeviceId deviceId, NodeId node) { |
| List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>()); |
| boolean modified = stbys.remove(node); |
| backups.put(deviceId, stbys); |
| return modified; |
| } |
| |
| private synchronized void incrementTerm(DeviceId deviceId) { |
| AtomicInteger term = termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)); |
| term.incrementAndGet(); |
| termMap.put(deviceId, term); |
| } |
| |
| @Override |
| public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) { |
| //just query |
| NodeId current = masterMap.get(deviceId); |
| MastershipRole role; |
| |
| if (current != null && current.equals(nodeId)) { |
| return MastershipRole.MASTER; |
| } |
| |
| if (backups.getOrDefault(deviceId, Collections.emptyList()).contains(nodeId)) { |
| role = MastershipRole.STANDBY; |
| } else { |
| role = MastershipRole.NONE; |
| } |
| return role; |
| } |
| |
| // synchronized for atomic read |
| @Override |
| public synchronized MastershipTerm getTermFor(DeviceId deviceId) { |
| if ((termMap.get(deviceId) == null)) { |
| return MastershipTerm.of(masterMap.get(deviceId), NOTHING); |
| } |
| return MastershipTerm.of( |
| masterMap.get(deviceId), termMap.get(deviceId).get()); |
| } |
| |
| @Override |
| public synchronized CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) { |
| MastershipRole role = getRole(nodeId, deviceId); |
| switch (role) { |
| case MASTER: |
| NodeId backup = reelect(deviceId, nodeId); |
| if (backup == null) { |
| // no master alternative |
| masterMap.remove(deviceId); |
| // TODO: Should there be new event type for no MASTER? |
| return CompletableFuture.completedFuture( |
| new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); |
| } else { |
| NodeId prevMaster = masterMap.put(deviceId, backup); |
| incrementTerm(deviceId); |
| addToBackup(deviceId, prevMaster); |
| return CompletableFuture.completedFuture( |
| new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); |
| } |
| |
| case STANDBY: |
| case NONE: |
| boolean modified = addToBackup(deviceId, nodeId); |
| if (modified) { |
| return CompletableFuture.completedFuture( |
| new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId))); |
| } |
| break; |
| |
| default: |
| log.warn("unknown Mastership Role {}", role); |
| } |
| return null; |
| } |
| |
| //dumbly selects next-available node that's not the current one |
| //emulate leader election |
| private synchronized NodeId reelect(DeviceId did, NodeId nodeId) { |
| List<NodeId> stbys = backups.getOrDefault(did, Collections.emptyList()); |
| NodeId backup = null; |
| for (NodeId n : stbys) { |
| if (!n.equals(nodeId)) { |
| backup = n; |
| break; |
| } |
| } |
| stbys.remove(backup); |
| return backup; |
| } |
| |
| @Override |
| public synchronized CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) { |
| MastershipRole role = getRole(nodeId, deviceId); |
| switch (role) { |
| case MASTER: |
| NodeId backup = reelect(deviceId, nodeId); |
| masterMap.put(deviceId, backup); |
| incrementTerm(deviceId); |
| return CompletableFuture.completedFuture( |
| new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId))); |
| |
| case STANDBY: |
| if (removeFromBackups(deviceId, nodeId)) { |
| return CompletableFuture.completedFuture( |
| new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId))); |
| } |
| break; |
| |
| case NONE: |
| break; |
| |
| default: |
| log.warn("unknown Mastership Role {}", role); |
| } |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| @Override |
| public synchronized void relinquishAllRole(NodeId nodeId) { |
| List<CompletableFuture<MastershipEvent>> eventFutures = new ArrayList<>(); |
| Set<DeviceId> toRelinquish = new HashSet<>(); |
| |
| masterMap.entrySet().stream() |
| .filter(entry -> nodeId.equals(entry.getValue())) |
| .forEach(entry -> toRelinquish.add(entry.getKey())); |
| |
| backups.entrySet().stream() |
| .filter(entry -> entry.getValue().contains(nodeId)) |
| .forEach(entry -> toRelinquish.add(entry.getKey())); |
| |
| toRelinquish.forEach(deviceId -> eventFutures.add(relinquishRole(nodeId, deviceId))); |
| |
| eventFutures.forEach(future -> { |
| future.whenComplete((event, error) -> notifyDelegate(event)); |
| }); |
| } |
| } |