Madan Jampani | e5723dc | 2015-08-31 14:23:58 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2015 Open Networking Laboratory |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.store.consistent.impl; |
| 17 | |
| 18 | import static org.slf4j.LoggerFactory.getLogger; |
| 19 | |
| 20 | import java.util.Arrays; |
| 21 | import java.util.List; |
| 22 | import java.util.Map; |
| 23 | import java.util.concurrent.CompletableFuture; |
| 24 | import java.util.concurrent.Executor; |
| 25 | |
| 26 | import org.apache.felix.scr.annotations.Activate; |
| 27 | import org.apache.felix.scr.annotations.Component; |
| 28 | import org.apache.felix.scr.annotations.Deactivate; |
| 29 | import org.apache.felix.scr.annotations.Reference; |
| 30 | import org.apache.felix.scr.annotations.ReferenceCardinality; |
| 31 | import org.apache.felix.scr.annotations.Service; |
| 32 | import org.onlab.util.Tools; |
| 33 | import org.onosproject.cluster.ClusterEvent; |
| 34 | import org.onosproject.cluster.ClusterEventListener; |
| 35 | import org.onosproject.cluster.ClusterService; |
| 36 | import org.onosproject.cluster.ControllerNode.State; |
| 37 | import org.onosproject.cluster.NodeId; |
| 38 | import org.onosproject.store.serializers.KryoNamespaces; |
| 39 | import org.onosproject.store.service.ConsistentMap; |
| 40 | import org.onosproject.store.service.ConsistentMapException; |
| 41 | import org.onosproject.store.service.MapEvent; |
| 42 | import org.onosproject.store.service.MapEventListener; |
| 43 | import org.onosproject.store.service.MutexExecutionService; |
| 44 | import org.onosproject.store.service.MutexTask; |
| 45 | import org.onosproject.store.service.Serializer; |
| 46 | import org.onosproject.store.service.StorageService; |
| 47 | import org.onosproject.store.service.Versioned; |
| 48 | import org.slf4j.Logger; |
| 49 | |
| 50 | import com.google.common.base.MoreObjects; |
| 51 | import com.google.common.collect.Lists; |
| 52 | import com.google.common.collect.Maps; |
| 53 | |
| 54 | /** |
| 55 | * Implementation of a MutexExecutionService. |
| 56 | */ |
| 57 | @Component(immediate = true) |
| 58 | @Service |
| 59 | public class MutexExecutionManager implements MutexExecutionService { |
| 60 | |
| 61 | private final Logger log = getLogger(getClass()); |
| 62 | |
| 63 | protected ConsistentMap<String, MutexState> lockMap; |
| 64 | protected NodeId localNodeId; |
| 65 | |
| 66 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 67 | protected ClusterService clusterService; |
| 68 | |
| 69 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 70 | protected StorageService storageService; |
| 71 | |
| 72 | private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener(); |
| 73 | private final ClusterEventListener clusterEventListener = new InternalClusterEventListener(); |
| 74 | |
| 75 | private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap(); |
| 76 | private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap(); |
| 77 | |
| 78 | @Activate |
| 79 | public void activate() { |
| 80 | localNodeId = clusterService.getLocalNode().id(); |
| 81 | lockMap = storageService.<String, MutexState>consistentMapBuilder() |
| 82 | .withName("onos-mutexes") |
| 83 | .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class)) |
| 84 | .withPartitionsDisabled() |
| 85 | .build(); |
| 86 | lockMap.addListener(mapEventListener); |
| 87 | clusterService.addListener(clusterEventListener); |
| 88 | releaseOldLocks(); |
| 89 | log.info("Started"); |
| 90 | } |
| 91 | |
| 92 | @Deactivate |
| 93 | public void deactivate() { |
| 94 | lockMap.removeListener(mapEventListener); |
| 95 | pending.values().forEach(future -> future.cancel(true)); |
| 96 | activeTasks.forEach((k, v) -> { |
| 97 | v.stop(); |
| 98 | unlock(k); |
| 99 | }); |
| 100 | clusterService.removeListener(clusterEventListener); |
| 101 | log.info("Stopped"); |
| 102 | } |
| 103 | |
| 104 | @Override |
| 105 | public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) { |
| 106 | return lock(exclusionPath) |
| 107 | .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath, |
| 108 | k -> new InnerMutexTask(exclusionPath, |
| 109 | task, |
| 110 | state.term()))) |
| 111 | .thenAcceptAsync(t -> t.start(), executor) |
| 112 | .whenComplete((r, e) -> unlock(exclusionPath)); |
| 113 | } |
| 114 | |
| 115 | protected CompletableFuture<MutexState> lock(String exclusionPath) { |
| 116 | CompletableFuture<MutexState> future = |
| 117 | pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>()); |
| 118 | tryLock(exclusionPath); |
| 119 | return future; |
| 120 | } |
| 121 | |
| 122 | /** |
| 123 | * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to |
| 124 | * the wait list. |
| 125 | * @param exclusionPath exclusion path |
| 126 | */ |
| 127 | protected void tryLock(String exclusionPath) { |
| 128 | Tools.retryable(() -> lockMap.asJavaMap() |
| 129 | .compute(exclusionPath, |
| 130 | (k, v) -> MutexState.admit(v, localNodeId)), |
| 131 | ConsistentMapException.ConcurrentModification.class, |
| 132 | Integer.MAX_VALUE, |
| 133 | 100).get(); |
| 134 | } |
| 135 | |
| 136 | /** |
| 137 | * Releases lock for the specific path. This operation is idempotent. |
| 138 | * @param exclusionPath exclusion path |
| 139 | */ |
| 140 | protected void unlock(String exclusionPath) { |
| 141 | Tools.retryable(() -> lockMap.asJavaMap() |
| 142 | .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)), |
| 143 | ConsistentMapException.ConcurrentModification.class, |
| 144 | Integer.MAX_VALUE, |
| 145 | 100).get(); |
| 146 | } |
| 147 | |
| 148 | /** |
| 149 | * Detects and releases all locks held by this node. |
| 150 | */ |
| 151 | private void releaseOldLocks() { |
| 152 | Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder())) |
| 153 | .keySet() |
| 154 | .forEach(path -> { |
| 155 | log.info("Detected zombie task still holding lock for {}. Releasing lock.", path); |
| 156 | unlock(path); |
| 157 | }); |
| 158 | } |
| 159 | |
| 160 | private class InternalLockMapEventListener implements MapEventListener<String, MutexState> { |
| 161 | |
| 162 | @Override |
| 163 | public void event(MapEvent<String, MutexState> event) { |
| 164 | log.debug("Received {}", event); |
| 165 | if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) { |
| 166 | pending.computeIfPresent(event.key(), (k, future) -> { |
| 167 | MutexState state = Versioned.valueOrElse(event.value(), null); |
| 168 | if (state != null && localNodeId.equals(state.holder())) { |
| 169 | log.debug("Local node is now owner for {}", event.key()); |
| 170 | future.complete(state); |
| 171 | return null; |
| 172 | } else { |
| 173 | return future; |
| 174 | } |
| 175 | }); |
| 176 | InnerMutexTask task = activeTasks.get(event.key()); |
| 177 | if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) { |
| 178 | task.stop(); |
| 179 | } |
| 180 | } |
| 181 | } |
| 182 | } |
| 183 | |
| 184 | private class InternalClusterEventListener implements ClusterEventListener { |
| 185 | |
| 186 | @Override |
| 187 | public void event(ClusterEvent event) { |
| 188 | if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED || |
| 189 | event.type() == ClusterEvent.Type.INSTANCE_REMOVED) { |
| 190 | NodeId nodeId = event.subject().id(); |
| 191 | log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId); |
| 192 | lockMap.asJavaMap().forEach((k, v) -> { |
| 193 | if (v.contains(nodeId)) { |
| 194 | lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId)); |
| 195 | } |
| 196 | }); |
| 197 | } |
| 198 | long activeNodes = clusterService.getNodes() |
| 199 | .stream() |
| 200 | .map(node -> clusterService.getState(node.id())) |
| 201 | .filter(State.ACTIVE::equals) |
| 202 | .count(); |
| 203 | if (clusterService.getNodes().size() > 1 && activeNodes == 1) { |
| 204 | log.info("This node is partitioned away from the cluster. Stopping all inflight executions"); |
| 205 | activeTasks.forEach((k, v) -> { |
| 206 | v.stop(); |
| 207 | }); |
| 208 | } |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | private static final class MutexState { |
| 213 | |
| 214 | private final NodeId holder; |
| 215 | private final List<NodeId> waitList; |
| 216 | private final long term; |
| 217 | |
| 218 | public static MutexState admit(MutexState state, NodeId nodeId) { |
| 219 | if (state == null) { |
| 220 | return new MutexState(nodeId, 1L, Lists.newArrayList()); |
| 221 | } else if (state.holder() == null) { |
| 222 | return new MutexState(nodeId, state.term() + 1, Lists.newArrayList()); |
| 223 | } else { |
| 224 | if (!state.contains(nodeId)) { |
| 225 | NodeId newHolder = state.holder(); |
| 226 | List<NodeId> newWaitList = Lists.newArrayList(state.waitList()); |
| 227 | newWaitList.add(nodeId); |
| 228 | return new MutexState(newHolder, state.term(), newWaitList); |
| 229 | } else { |
| 230 | return state; |
| 231 | } |
| 232 | } |
| 233 | } |
| 234 | |
| 235 | public static MutexState evict(MutexState state, NodeId nodeId) { |
| 236 | return state.evict(nodeId); |
| 237 | } |
| 238 | |
| 239 | public MutexState evict(NodeId nodeId) { |
| 240 | if (nodeId.equals(holder)) { |
| 241 | if (waitList.isEmpty()) { |
| 242 | return new MutexState(null, term, waitList); |
| 243 | } |
| 244 | List<NodeId> newWaitList = Lists.newArrayList(waitList); |
| 245 | NodeId newHolder = newWaitList.remove(0); |
| 246 | return new MutexState(newHolder, term + 1, newWaitList); |
| 247 | } else { |
| 248 | NodeId newHolder = holder; |
| 249 | List<NodeId> newWaitList = Lists.newArrayList(waitList); |
| 250 | newWaitList.remove(nodeId); |
| 251 | return new MutexState(newHolder, term, newWaitList); |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | public NodeId holder() { |
| 256 | return holder; |
| 257 | } |
| 258 | |
| 259 | public List<NodeId> waitList() { |
| 260 | return waitList; |
| 261 | } |
| 262 | |
| 263 | public long term() { |
| 264 | return term; |
| 265 | } |
| 266 | |
| 267 | private boolean contains(NodeId nodeId) { |
| 268 | return (nodeId.equals(holder) || waitList.contains(nodeId)); |
| 269 | } |
| 270 | |
| 271 | private MutexState(NodeId holder, long term, List<NodeId> waitList) { |
| 272 | this.holder = holder; |
| 273 | this.term = term; |
| 274 | this.waitList = Lists.newArrayList(waitList); |
| 275 | } |
| 276 | |
| 277 | @Override |
| 278 | public String toString() { |
| 279 | return MoreObjects.toStringHelper(getClass()) |
| 280 | .add("holder", holder) |
| 281 | .add("term", term) |
| 282 | .add("waitList", waitList) |
| 283 | .toString(); |
| 284 | } |
| 285 | } |
| 286 | |
| 287 | private class InnerMutexTask implements MutexTask { |
| 288 | private final MutexTask task; |
| 289 | private final String mutexPath; |
| 290 | private final long term; |
| 291 | |
| 292 | public InnerMutexTask(String mutexPath, MutexTask task, long term) { |
| 293 | this.mutexPath = mutexPath; |
| 294 | this.term = term; |
| 295 | this.task = task; |
| 296 | } |
| 297 | |
| 298 | public long term() { |
| 299 | return term; |
| 300 | } |
| 301 | |
| 302 | @Override |
| 303 | public void start() { |
| 304 | log.debug("Starting execution for mutex task guarded by {}", mutexPath); |
| 305 | task.start(); |
| 306 | log.debug("Finished execution for mutex task guarded by {}", mutexPath); |
| 307 | } |
| 308 | |
| 309 | @Override |
| 310 | public void stop() { |
| 311 | log.debug("Stopping execution for mutex task guarded by {}", mutexPath); |
| 312 | task.stop(); |
| 313 | } |
| 314 | } |
| 315 | } |