blob: 5946fdb51776cbf6dee0510431fb3f87723842a7 [file] [log] [blame]
Madan Jampanie5723dc2015-08-31 14:23:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Madan Jampanif4c88502016-01-21 12:35:36 -080016package org.onosproject.store.primitives.impl;
Madan Jampanie5723dc2015-08-31 14:23:58 -070017
18import static org.slf4j.LoggerFactory.getLogger;
19
20import java.util.Arrays;
21import java.util.List;
22import java.util.Map;
23import java.util.concurrent.CompletableFuture;
24import java.util.concurrent.Executor;
25
26import org.apache.felix.scr.annotations.Activate;
27import org.apache.felix.scr.annotations.Component;
28import org.apache.felix.scr.annotations.Deactivate;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.Tools;
33import org.onosproject.cluster.ClusterEvent;
34import org.onosproject.cluster.ClusterEventListener;
35import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.ControllerNode.State;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.ConsistentMap;
40import org.onosproject.store.service.ConsistentMapException;
41import org.onosproject.store.service.MapEvent;
42import org.onosproject.store.service.MapEventListener;
43import org.onosproject.store.service.MutexExecutionService;
44import org.onosproject.store.service.MutexTask;
45import org.onosproject.store.service.Serializer;
46import org.onosproject.store.service.StorageService;
47import org.onosproject.store.service.Versioned;
48import org.slf4j.Logger;
49
50import com.google.common.base.MoreObjects;
51import com.google.common.collect.Lists;
52import com.google.common.collect.Maps;
53
54/**
55 * Implementation of a MutexExecutionService.
56 */
57@Component(immediate = true)
58@Service
59public class MutexExecutionManager implements MutexExecutionService {
60
61 private final Logger log = getLogger(getClass());
62
63 protected ConsistentMap<String, MutexState> lockMap;
64 protected NodeId localNodeId;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected ClusterService clusterService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected StorageService storageService;
71
72 private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener();
73 private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
74
75 private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap();
76 private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap();
77
78 @Activate
79 public void activate() {
80 localNodeId = clusterService.getLocalNode().id();
81 lockMap = storageService.<String, MutexState>consistentMapBuilder()
82 .withName("onos-mutexes")
83 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class))
84 .withPartitionsDisabled()
85 .build();
86 lockMap.addListener(mapEventListener);
87 clusterService.addListener(clusterEventListener);
88 releaseOldLocks();
89 log.info("Started");
90 }
91
92 @Deactivate
93 public void deactivate() {
94 lockMap.removeListener(mapEventListener);
95 pending.values().forEach(future -> future.cancel(true));
96 activeTasks.forEach((k, v) -> {
97 v.stop();
98 unlock(k);
99 });
100 clusterService.removeListener(clusterEventListener);
101 log.info("Stopped");
102 }
103
104 @Override
105 public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) {
106 return lock(exclusionPath)
107 .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath,
108 k -> new InnerMutexTask(exclusionPath,
109 task,
110 state.term())))
111 .thenAcceptAsync(t -> t.start(), executor)
112 .whenComplete((r, e) -> unlock(exclusionPath));
113 }
114
115 protected CompletableFuture<MutexState> lock(String exclusionPath) {
116 CompletableFuture<MutexState> future =
117 pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>());
118 tryLock(exclusionPath);
119 return future;
120 }
121
122 /**
123 * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to
124 * the wait list.
125 * @param exclusionPath exclusion path
126 */
127 protected void tryLock(String exclusionPath) {
128 Tools.retryable(() -> lockMap.asJavaMap()
129 .compute(exclusionPath,
130 (k, v) -> MutexState.admit(v, localNodeId)),
131 ConsistentMapException.ConcurrentModification.class,
132 Integer.MAX_VALUE,
133 100).get();
134 }
135
136 /**
137 * Releases lock for the specific path. This operation is idempotent.
138 * @param exclusionPath exclusion path
139 */
140 protected void unlock(String exclusionPath) {
141 Tools.retryable(() -> lockMap.asJavaMap()
142 .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)),
143 ConsistentMapException.ConcurrentModification.class,
144 Integer.MAX_VALUE,
145 100).get();
146 }
147
148 /**
149 * Detects and releases all locks held by this node.
150 */
151 private void releaseOldLocks() {
152 Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder()))
153 .keySet()
154 .forEach(path -> {
155 log.info("Detected zombie task still holding lock for {}. Releasing lock.", path);
156 unlock(path);
157 });
158 }
159
160 private class InternalLockMapEventListener implements MapEventListener<String, MutexState> {
161
162 @Override
163 public void event(MapEvent<String, MutexState> event) {
164 log.debug("Received {}", event);
165 if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) {
166 pending.computeIfPresent(event.key(), (k, future) -> {
167 MutexState state = Versioned.valueOrElse(event.value(), null);
168 if (state != null && localNodeId.equals(state.holder())) {
169 log.debug("Local node is now owner for {}", event.key());
170 future.complete(state);
171 return null;
172 } else {
173 return future;
174 }
175 });
176 InnerMutexTask task = activeTasks.get(event.key());
177 if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) {
178 task.stop();
179 }
180 }
181 }
182 }
183
184 private class InternalClusterEventListener implements ClusterEventListener {
185
186 @Override
187 public void event(ClusterEvent event) {
188 if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED ||
189 event.type() == ClusterEvent.Type.INSTANCE_REMOVED) {
190 NodeId nodeId = event.subject().id();
191 log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId);
192 lockMap.asJavaMap().forEach((k, v) -> {
193 if (v.contains(nodeId)) {
194 lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId));
195 }
196 });
197 }
198 long activeNodes = clusterService.getNodes()
199 .stream()
200 .map(node -> clusterService.getState(node.id()))
201 .filter(State.ACTIVE::equals)
202 .count();
203 if (clusterService.getNodes().size() > 1 && activeNodes == 1) {
204 log.info("This node is partitioned away from the cluster. Stopping all inflight executions");
205 activeTasks.forEach((k, v) -> {
206 v.stop();
207 });
208 }
209 }
210 }
211
212 private static final class MutexState {
213
214 private final NodeId holder;
215 private final List<NodeId> waitList;
216 private final long term;
217
218 public static MutexState admit(MutexState state, NodeId nodeId) {
219 if (state == null) {
220 return new MutexState(nodeId, 1L, Lists.newArrayList());
221 } else if (state.holder() == null) {
222 return new MutexState(nodeId, state.term() + 1, Lists.newArrayList());
223 } else {
224 if (!state.contains(nodeId)) {
225 NodeId newHolder = state.holder();
226 List<NodeId> newWaitList = Lists.newArrayList(state.waitList());
227 newWaitList.add(nodeId);
228 return new MutexState(newHolder, state.term(), newWaitList);
229 } else {
230 return state;
231 }
232 }
233 }
234
235 public static MutexState evict(MutexState state, NodeId nodeId) {
236 return state.evict(nodeId);
237 }
238
239 public MutexState evict(NodeId nodeId) {
240 if (nodeId.equals(holder)) {
241 if (waitList.isEmpty()) {
242 return new MutexState(null, term, waitList);
243 }
244 List<NodeId> newWaitList = Lists.newArrayList(waitList);
245 NodeId newHolder = newWaitList.remove(0);
246 return new MutexState(newHolder, term + 1, newWaitList);
247 } else {
248 NodeId newHolder = holder;
249 List<NodeId> newWaitList = Lists.newArrayList(waitList);
250 newWaitList.remove(nodeId);
251 return new MutexState(newHolder, term, newWaitList);
252 }
253 }
254
255 public NodeId holder() {
256 return holder;
257 }
258
259 public List<NodeId> waitList() {
260 return waitList;
261 }
262
263 public long term() {
264 return term;
265 }
266
267 private boolean contains(NodeId nodeId) {
268 return (nodeId.equals(holder) || waitList.contains(nodeId));
269 }
270
271 private MutexState(NodeId holder, long term, List<NodeId> waitList) {
272 this.holder = holder;
273 this.term = term;
274 this.waitList = Lists.newArrayList(waitList);
275 }
276
277 @Override
278 public String toString() {
279 return MoreObjects.toStringHelper(getClass())
280 .add("holder", holder)
281 .add("term", term)
282 .add("waitList", waitList)
283 .toString();
284 }
285 }
286
287 private class InnerMutexTask implements MutexTask {
288 private final MutexTask task;
289 private final String mutexPath;
290 private final long term;
291
292 public InnerMutexTask(String mutexPath, MutexTask task, long term) {
293 this.mutexPath = mutexPath;
294 this.term = term;
295 this.task = task;
296 }
297
298 public long term() {
299 return term;
300 }
301
302 @Override
303 public void start() {
304 log.debug("Starting execution for mutex task guarded by {}", mutexPath);
305 task.start();
306 log.debug("Finished execution for mutex task guarded by {}", mutexPath);
307 }
308
309 @Override
310 public void stop() {
311 log.debug("Stopping execution for mutex task guarded by {}", mutexPath);
312 task.stop();
313 }
314 }
315}