blob: 26f4b677ef08ec5d2c2cefb2ea26914250d3697e [file] [log] [blame]
alshabibab984662014-12-04 18:56:18 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.service.impl;
Madan Jampani12390c12014-11-12 00:35:56 -080017
Madan Jampania88d1f52014-11-14 16:45:24 -080018import static org.onlab.util.Tools.namedThreads;
Madan Jampani12390c12014-11-12 00:35:56 -080019import static org.slf4j.LoggerFactory.getLogger;
20
21import java.util.Iterator;
22import java.util.List;
Madan Jampaniac201952014-11-18 10:06:01 -080023import java.util.Set;
Madan Jampani12390c12014-11-12 00:35:56 -080024import java.util.concurrent.CompletableFuture;
Madan Jampania88d1f52014-11-14 16:45:24 -080025import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
Madan Jampani12390c12014-11-12 00:35:56 -080027
28import org.apache.felix.scr.annotations.Activate;
29import org.apache.felix.scr.annotations.Component;
30import org.apache.felix.scr.annotations.Deactivate;
31import org.apache.felix.scr.annotations.Reference;
32import org.apache.felix.scr.annotations.ReferenceCardinality;
33import org.apache.felix.scr.annotations.Service;
34import org.joda.time.DateTime;
Brian O'Connorabafb502014-12-02 22:26:20 -080035import org.onosproject.cluster.ClusterService;
36import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
37import org.onosproject.store.cluster.messaging.ClusterMessage;
38import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
39import org.onosproject.store.service.DatabaseAdminService;
40import org.onosproject.store.service.DatabaseException;
41import org.onosproject.store.service.DatabaseService;
42import org.onosproject.store.service.Lock;
43import org.onosproject.store.service.LockEventListener;
44import org.onosproject.store.service.LockService;
Madan Jampani12390c12014-11-12 00:35:56 -080045import org.slf4j.Logger;
46
Madan Jampania88d1f52014-11-14 16:45:24 -080047import com.google.common.collect.LinkedListMultimap;
48import com.google.common.collect.ListMultimap;
49import com.google.common.collect.Multimaps;
Madan Jampani12390c12014-11-12 00:35:56 -080050
Brian O'Connor5d55ed42014-12-01 18:27:47 -080051@Component(immediate = false)
Madan Jampani12390c12014-11-12 00:35:56 -080052@Service
53public class DistributedLockManager implements LockService {
54
Madan Jampania88d1f52014-11-14 16:45:24 -080055 private static final ExecutorService THREAD_POOL =
56 Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
57
Madan Jampani12390c12014-11-12 00:35:56 -080058 private final Logger log = getLogger(getClass());
59
60 public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
61
Madan Jampaniac201952014-11-18 10:06:01 -080062 public static final int DEAD_LOCK_TIMEOUT_MS = 5000;
63
Madan Jampania88d1f52014-11-14 16:45:24 -080064 private final ListMultimap<String, LockRequest> locksToAcquire =
65 Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
Madan Jampani12390c12014-11-12 00:35:56 -080066
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 private ClusterCommunicationService clusterCommunicator;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniac201952014-11-18 10:06:01 -080071 private DatabaseAdminService databaseAdminService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani12390c12014-11-12 00:35:56 -080074 private DatabaseService databaseService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 private ClusterService clusterService;
78
79 @Activate
80 public void activate() {
Madan Jampaniac201952014-11-18 10:06:01 -080081 try {
Madan Jampani71582ed2014-11-18 10:06:01 -080082 Set<String> tables = databaseAdminService.listTables();
83
84 if (!tables.contains(ONOS_LOCK_TABLE_NAME)) {
Madan Jampaniac201952014-11-18 10:06:01 -080085 if (databaseAdminService.createTable(ONOS_LOCK_TABLE_NAME, DEAD_LOCK_TIMEOUT_MS)) {
86 log.info("Created {} table.", ONOS_LOCK_TABLE_NAME);
87 }
88 }
89 } catch (DatabaseException e) {
90 log.error("DistributedLockManager#activate failed.", e);
Madan Jampaniac201952014-11-18 10:06:01 -080091 }
92
Madan Jampani71582ed2014-11-18 10:06:01 -080093 clusterCommunicator.addSubscriber(
94 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
95 new LockEventMessageListener());
Madan Jampaniac201952014-11-18 10:06:01 -080096
Madan Jampani71582ed2014-11-18 10:06:01 -080097 log.info("Started");
Madan Jampani12390c12014-11-12 00:35:56 -080098 }
99
100 @Deactivate
101 public void deactivate() {
Madan Jampania88d1f52014-11-14 16:45:24 -0800102 clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
Madan Jampani12390c12014-11-12 00:35:56 -0800103 locksToAcquire.clear();
Madan Jampania88d1f52014-11-14 16:45:24 -0800104 log.info("Stopped.");
Madan Jampani12390c12014-11-12 00:35:56 -0800105 }
106
107 @Override
108 public Lock create(String path) {
Madan Jampani9b37d572014-11-12 11:53:24 -0800109 return new DistributedLock(path, databaseService, clusterService, this);
Madan Jampani12390c12014-11-12 00:35:56 -0800110 }
111
112 @Override
113 public void addListener(LockEventListener listener) {
Madan Jampani12390c12014-11-12 00:35:56 -0800114 throw new UnsupportedOperationException();
115 }
116
117 @Override
118 public void removeListener(LockEventListener listener) {
Madan Jampani12390c12014-11-12 00:35:56 -0800119 throw new UnsupportedOperationException();
120 }
121
Madan Jampania88d1f52014-11-14 16:45:24 -0800122 /**
123 * Attempts to acquire the lock as soon as it becomes available.
124 * @param lock lock to acquire.
125 * @param waitTimeMillis maximum time to wait before giving up.
126 * @param leaseDurationMillis the duration for which to acquire the lock initially.
Madan Jampani1769a1a2014-11-19 21:51:44 -0800127 * @return Future that can be blocked on until lock becomes available.
Madan Jampania88d1f52014-11-14 16:45:24 -0800128 */
Madan Jampani1769a1a2014-11-19 21:51:44 -0800129 protected CompletableFuture<Void> lockIfAvailable(
Madan Jampania88d1f52014-11-14 16:45:24 -0800130 Lock lock,
Madan Jampani1769a1a2014-11-19 21:51:44 -0800131 int waitTimeMillis,
Madan Jampania88d1f52014-11-14 16:45:24 -0800132 int leaseDurationMillis) {
Madan Jampani1769a1a2014-11-19 21:51:44 -0800133 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampani71582ed2014-11-18 10:06:01 -0800134 LockRequest request = new LockRequest(
135 lock,
136 leaseDurationMillis,
Madan Jampani1769a1a2014-11-19 21:51:44 -0800137 DateTime.now().plusMillis(waitTimeMillis),
Madan Jampani71582ed2014-11-18 10:06:01 -0800138 future);
139 locksToAcquire.put(lock.path(), request);
140 return future;
141 }
142
143 /**
144 * Attempts to acquire the lock as soon as it becomes available.
145 * @param lock lock to acquire.
146 * @param leaseDurationMillis the duration for which to acquire the lock initially.
147 * @return Future lease expiration date.
148 */
Madan Jampani1769a1a2014-11-19 21:51:44 -0800149 protected CompletableFuture<Void> lockIfAvailable(
Madan Jampani71582ed2014-11-18 10:06:01 -0800150 Lock lock,
151 int leaseDurationMillis) {
Madan Jampani1769a1a2014-11-19 21:51:44 -0800152 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampani71582ed2014-11-18 10:06:01 -0800153 LockRequest request = new LockRequest(
154 lock,
155 leaseDurationMillis,
156 DateTime.now().plusYears(100),
157 future);
158 locksToAcquire.put(lock.path(), request);
Madan Jampani12390c12014-11-12 00:35:56 -0800159 return future;
160 }
161
162 private class LockEventMessageListener implements ClusterMessageHandler {
163 @Override
164 public void handle(ClusterMessage message) {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800165 TableModificationEvent event = ClusterMessagingProtocol.DB_SERIALIZER
Madan Jampani9b37d572014-11-12 11:53:24 -0800166 .decode(message.payload());
Madan Jampania88d1f52014-11-14 16:45:24 -0800167 if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
168 event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
169 THREAD_POOL.submit(new RetryLockTask(event.key()));
Madan Jampani12390c12014-11-12 00:35:56 -0800170 }
Madan Jampania88d1f52014-11-14 16:45:24 -0800171 }
172 }
Madan Jampani12390c12014-11-12 00:35:56 -0800173
Madan Jampania88d1f52014-11-14 16:45:24 -0800174 private class RetryLockTask implements Runnable {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800175
Madan Jampania88d1f52014-11-14 16:45:24 -0800176 private final String path;
177
178 public RetryLockTask(String path) {
179 this.path = path;
180 }
181
182 @Override
183 public void run() {
Madan Jampani12390c12014-11-12 00:35:56 -0800184 if (!locksToAcquire.containsKey(path)) {
185 return;
186 }
187
Madan Jampania88d1f52014-11-14 16:45:24 -0800188 List<LockRequest> existingRequests = locksToAcquire.get(path);
189 if (existingRequests == null || existingRequests.isEmpty()) {
190 return;
191 }
192 log.info("Path {} is now available for locking. There are {} outstanding "
193 + "requests for it.",
194 path, existingRequests.size());
Madan Jampani12390c12014-11-12 00:35:56 -0800195
Madan Jampania88d1f52014-11-14 16:45:24 -0800196 synchronized (existingRequests) {
197 Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
198 while (existingRequestIterator.hasNext()) {
199 LockRequest request = existingRequestIterator.next();
200 if (DateTime.now().isAfter(request.requestExpirationTime())) {
201 // request expired.
202 existingRequestIterator.remove();
203 } else {
204 if (request.lock().tryLock(request.leaseDurationMillis())) {
Madan Jampani1769a1a2014-11-19 21:51:44 -0800205 request.future().complete(null);
Madan Jampani9b37d572014-11-12 11:53:24 -0800206 existingRequestIterator.remove();
Madan Jampani12390c12014-11-12 00:35:56 -0800207 }
208 }
209 }
210 }
211 }
212 }
213
214 private class LockRequest {
215
216 private final Lock lock;
Madan Jampania88d1f52014-11-14 16:45:24 -0800217 private final DateTime requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800218 private final int leaseDurationMillis;
Madan Jampani1769a1a2014-11-19 21:51:44 -0800219 private final CompletableFuture<Void> future;
Madan Jampani12390c12014-11-12 00:35:56 -0800220
Madan Jampani71582ed2014-11-18 10:06:01 -0800221 public LockRequest(
222 Lock lock,
223 int leaseDurationMillis,
224 DateTime requestExpirationTime,
Madan Jampani1769a1a2014-11-19 21:51:44 -0800225 CompletableFuture<Void> future) {
Madan Jampani12390c12014-11-12 00:35:56 -0800226
227 this.lock = lock;
Madan Jampani71582ed2014-11-18 10:06:01 -0800228 this.requestExpirationTime = requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800229 this.leaseDurationMillis = leaseDurationMillis;
230 this.future = future;
231 }
232
233 public Lock lock() {
234 return lock;
235 }
236
Madan Jampania88d1f52014-11-14 16:45:24 -0800237 public DateTime requestExpirationTime() {
238 return requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800239 }
240
241 public int leaseDurationMillis() {
242 return leaseDurationMillis;
243 }
244
Madan Jampani1769a1a2014-11-19 21:51:44 -0800245 public CompletableFuture<Void> future() {
Madan Jampani12390c12014-11-12 00:35:56 -0800246 return future;
247 }
248 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800249}