blob: bc45d683d04c7e2642615bbce1ec137f6295c83a [file] [log] [blame]
Madan Jampanie5723dc2015-08-31 14:23:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Madan Jampanie5723dc2015-08-31 14:23:58 -070017
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.util.Arrays;
21import java.util.List;
22import java.util.Map;
23import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.Executor;
25
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.Tools;
33import org.onosproject.cluster.ClusterEvent;
34import org.onosproject.cluster.ClusterEventListener;
35import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.ControllerNode.State;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.ConsistentMap;
40import org.onosproject.store.service.ConsistentMapException;
41import org.onosproject.store.service.MapEvent;
42import org.onosproject.store.service.MapEventListener;
43import org.onosproject.store.service.MutexExecutionService;
44import org.onosproject.store.service.MutexTask;
45import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.StorageService;
47import org.onosproject.store.service.Versioned;
48import org.slf4j.Logger;
49
50import com.google.common.base.MoreObjects;
51import com.google.common.collect.Lists;
52import com.google.common.collect.Maps;
Heedo Kang4a47a302016-02-29 17:40:23 +090053import static org.onosproject.security.AppGuard.checkPermission;
54import static org.onosproject.security.AppPermission.Type.MUTEX_WRITE;
Madan Jampanie5723dc2015-08-31 14:23:58 -070055/**
56 * Implementation of a MutexExecutionService.
57 */
58@Component(immediate = true)
59@Service
60public class MutexExecutionManager implements MutexExecutionService {
61
62 private final Logger log = getLogger(getClass());
63
64 protected ConsistentMap<String, MutexState> lockMap;
65 protected NodeId localNodeId;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected ClusterService clusterService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected StorageService storageService;
72
73 private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener();
74 private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
75
76 private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap();
77 private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap();
78
79 @Activate
80 public void activate() {
81 localNodeId = clusterService.getLocalNode().id();
82 lockMap = storageService.<String, MutexState>consistentMapBuilder()
83 .withName("onos-mutexes")
84 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class))
85 .withPartitionsDisabled()
86 .build();
87 lockMap.addListener(mapEventListener);
88 clusterService.addListener(clusterEventListener);
89 releaseOldLocks();
90 log.info("Started");
91 }
92
93 @Deactivate
94 public void deactivate() {
95 lockMap.removeListener(mapEventListener);
96 pending.values().forEach(future -> future.cancel(true));
97 activeTasks.forEach((k, v) -> {
98 v.stop();
99 unlock(k);
100 });
101 clusterService.removeListener(clusterEventListener);
102 log.info("Stopped");
103 }
104
105 @Override
106 public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900107 checkPermission(MUTEX_WRITE);
Madan Jampanie5723dc2015-08-31 14:23:58 -0700108 return lock(exclusionPath)
109 .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath,
110 k -> new InnerMutexTask(exclusionPath,
111 task,
112 state.term())))
113 .thenAcceptAsync(t -> t.start(), executor)
114 .whenComplete((r, e) -> unlock(exclusionPath));
115 }
116
117 protected CompletableFuture<MutexState> lock(String exclusionPath) {
118 CompletableFuture<MutexState> future =
119 pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>());
120 tryLock(exclusionPath);
121 return future;
122 }
123
124 /**
125 * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to
126 * the wait list.
127 * @param exclusionPath exclusion path
128 */
129 protected void tryLock(String exclusionPath) {
130 Tools.retryable(() -> lockMap.asJavaMap()
131 .compute(exclusionPath,
132 (k, v) -> MutexState.admit(v, localNodeId)),
133 ConsistentMapException.ConcurrentModification.class,
134 Integer.MAX_VALUE,
135 100).get();
136 }
137
138 /**
139 * Releases lock for the specific path. This operation is idempotent.
140 * @param exclusionPath exclusion path
141 */
142 protected void unlock(String exclusionPath) {
143 Tools.retryable(() -> lockMap.asJavaMap()
144 .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)),
145 ConsistentMapException.ConcurrentModification.class,
146 Integer.MAX_VALUE,
147 100).get();
148 }
149
150 /**
151 * Detects and releases all locks held by this node.
152 */
153 private void releaseOldLocks() {
154 Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder()))
155 .keySet()
156 .forEach(path -> {
157 log.info("Detected zombie task still holding lock for {}. Releasing lock.", path);
158 unlock(path);
159 });
160 }
161
162 private class InternalLockMapEventListener implements MapEventListener<String, MutexState> {
163
164 @Override
165 public void event(MapEvent<String, MutexState> event) {
166 log.debug("Received {}", event);
167 if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) {
168 pending.computeIfPresent(event.key(), (k, future) -> {
169 MutexState state = Versioned.valueOrElse(event.value(), null);
170 if (state != null && localNodeId.equals(state.holder())) {
171 log.debug("Local node is now owner for {}", event.key());
172 future.complete(state);
173 return null;
174 } else {
175 return future;
176 }
177 });
178 InnerMutexTask task = activeTasks.get(event.key());
179 if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) {
180 task.stop();
181 }
182 }
183 }
184 }
185
186 private class InternalClusterEventListener implements ClusterEventListener {
187
188 @Override
189 public void event(ClusterEvent event) {
190 if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED ||
191 event.type() == ClusterEvent.Type.INSTANCE_REMOVED) {
192 NodeId nodeId = event.subject().id();
193 log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId);
194 lockMap.asJavaMap().forEach((k, v) -> {
195 if (v.contains(nodeId)) {
196 lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId));
197 }
198 });
199 }
200 long activeNodes = clusterService.getNodes()
201 .stream()
202 .map(node -> clusterService.getState(node.id()))
Thomas Vachuska7a8de842016-03-07 20:56:35 -0800203 .filter(State::isActive)
Madan Jampanie5723dc2015-08-31 14:23:58 -0700204 .count();
205 if (clusterService.getNodes().size() > 1 && activeNodes == 1) {
206 log.info("This node is partitioned away from the cluster. Stopping all inflight executions");
207 activeTasks.forEach((k, v) -> {
208 v.stop();
209 });
210 }
211 }
212 }
213
214 private static final class MutexState {
215
216 private final NodeId holder;
217 private final List<NodeId> waitList;
218 private final long term;
219
220 public static MutexState admit(MutexState state, NodeId nodeId) {
221 if (state == null) {
222 return new MutexState(nodeId, 1L, Lists.newArrayList());
223 } else if (state.holder() == null) {
224 return new MutexState(nodeId, state.term() + 1, Lists.newArrayList());
225 } else {
226 if (!state.contains(nodeId)) {
227 NodeId newHolder = state.holder();
228 List<NodeId> newWaitList = Lists.newArrayList(state.waitList());
229 newWaitList.add(nodeId);
230 return new MutexState(newHolder, state.term(), newWaitList);
231 } else {
232 return state;
233 }
234 }
235 }
236
237 public static MutexState evict(MutexState state, NodeId nodeId) {
238 return state.evict(nodeId);
239 }
240
241 public MutexState evict(NodeId nodeId) {
242 if (nodeId.equals(holder)) {
243 if (waitList.isEmpty()) {
244 return new MutexState(null, term, waitList);
245 }
246 List<NodeId> newWaitList = Lists.newArrayList(waitList);
247 NodeId newHolder = newWaitList.remove(0);
248 return new MutexState(newHolder, term + 1, newWaitList);
249 } else {
250 NodeId newHolder = holder;
251 List<NodeId> newWaitList = Lists.newArrayList(waitList);
252 newWaitList.remove(nodeId);
253 return new MutexState(newHolder, term, newWaitList);
254 }
255 }
256
257 public NodeId holder() {
258 return holder;
259 }
260
261 public List<NodeId> waitList() {
262 return waitList;
263 }
264
265 public long term() {
266 return term;
267 }
268
269 private boolean contains(NodeId nodeId) {
270 return (nodeId.equals(holder) || waitList.contains(nodeId));
271 }
272
273 private MutexState(NodeId holder, long term, List<NodeId> waitList) {
274 this.holder = holder;
275 this.term = term;
276 this.waitList = Lists.newArrayList(waitList);
277 }
278
279 @Override
280 public String toString() {
281 return MoreObjects.toStringHelper(getClass())
282 .add("holder", holder)
283 .add("term", term)
284 .add("waitList", waitList)
285 .toString();
286 }
287 }
288
289 private class InnerMutexTask implements MutexTask {
290 private final MutexTask task;
291 private final String mutexPath;
292 private final long term;
293
294 public InnerMutexTask(String mutexPath, MutexTask task, long term) {
295 this.mutexPath = mutexPath;
296 this.term = term;
297 this.task = task;
298 }
299
300 public long term() {
301 return term;
302 }
303
304 @Override
305 public void start() {
306 log.debug("Starting execution for mutex task guarded by {}", mutexPath);
307 task.start();
308 log.debug("Finished execution for mutex task guarded by {}", mutexPath);
309 }
310
311 @Override
312 public void stop() {
313 log.debug("Stopping execution for mutex task guarded by {}", mutexPath);
314 task.stop();
315 }
316 }
317}