blob: 681cd2d413d19a5b3286afeb20b37d88f952a547 [file] [log] [blame]
Madan Jampani12390c12014-11-12 00:35:56 -08001package org.onlab.onos.store.service.impl;
2
Madan Jampania88d1f52014-11-14 16:45:24 -08003import static org.onlab.util.Tools.namedThreads;
Madan Jampani12390c12014-11-12 00:35:56 -08004import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.Iterator;
7import java.util.List;
Madan Jampaniac201952014-11-18 10:06:01 -08008import java.util.Set;
Madan Jampani12390c12014-11-12 00:35:56 -08009import java.util.concurrent.CompletableFuture;
Madan Jampania88d1f52014-11-14 16:45:24 -080010import java.util.concurrent.ExecutorService;
11import java.util.concurrent.Executors;
Madan Jampani12390c12014-11-12 00:35:56 -080012
13import org.apache.felix.scr.annotations.Activate;
14import org.apache.felix.scr.annotations.Component;
15import org.apache.felix.scr.annotations.Deactivate;
16import org.apache.felix.scr.annotations.Reference;
17import org.apache.felix.scr.annotations.ReferenceCardinality;
18import org.apache.felix.scr.annotations.Service;
19import org.joda.time.DateTime;
20import org.onlab.onos.cluster.ClusterService;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
22import org.onlab.onos.store.cluster.messaging.ClusterMessage;
23import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampaniac201952014-11-18 10:06:01 -080024import org.onlab.onos.store.service.DatabaseAdminService;
25import org.onlab.onos.store.service.DatabaseException;
Madan Jampani12390c12014-11-12 00:35:56 -080026import org.onlab.onos.store.service.DatabaseService;
27import org.onlab.onos.store.service.Lock;
28import org.onlab.onos.store.service.LockEventListener;
29import org.onlab.onos.store.service.LockService;
30import org.slf4j.Logger;
31
Madan Jampania88d1f52014-11-14 16:45:24 -080032import com.google.common.collect.LinkedListMultimap;
33import com.google.common.collect.ListMultimap;
34import com.google.common.collect.Multimaps;
Madan Jampani12390c12014-11-12 00:35:56 -080035
Brian O'Connor5d55ed42014-12-01 18:27:47 -080036@Component(immediate = false)
Madan Jampani12390c12014-11-12 00:35:56 -080037@Service
38public class DistributedLockManager implements LockService {
39
Madan Jampania88d1f52014-11-14 16:45:24 -080040 private static final ExecutorService THREAD_POOL =
41 Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
42
Madan Jampani12390c12014-11-12 00:35:56 -080043 private final Logger log = getLogger(getClass());
44
45 public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
46
Madan Jampaniac201952014-11-18 10:06:01 -080047 public static final int DEAD_LOCK_TIMEOUT_MS = 5000;
48
Madan Jampania88d1f52014-11-14 16:45:24 -080049 private final ListMultimap<String, LockRequest> locksToAcquire =
50 Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
Madan Jampani12390c12014-11-12 00:35:56 -080051
52 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 private ClusterCommunicationService clusterCommunicator;
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniac201952014-11-18 10:06:01 -080056 private DatabaseAdminService databaseAdminService;
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani12390c12014-11-12 00:35:56 -080059 private DatabaseService databaseService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 private ClusterService clusterService;
63
64 @Activate
65 public void activate() {
Madan Jampaniac201952014-11-18 10:06:01 -080066 try {
Madan Jampani71582ed2014-11-18 10:06:01 -080067 Set<String> tables = databaseAdminService.listTables();
68
69 if (!tables.contains(ONOS_LOCK_TABLE_NAME)) {
Madan Jampaniac201952014-11-18 10:06:01 -080070 if (databaseAdminService.createTable(ONOS_LOCK_TABLE_NAME, DEAD_LOCK_TIMEOUT_MS)) {
71 log.info("Created {} table.", ONOS_LOCK_TABLE_NAME);
72 }
73 }
74 } catch (DatabaseException e) {
75 log.error("DistributedLockManager#activate failed.", e);
Madan Jampaniac201952014-11-18 10:06:01 -080076 }
77
Madan Jampani71582ed2014-11-18 10:06:01 -080078 clusterCommunicator.addSubscriber(
79 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
80 new LockEventMessageListener());
Madan Jampaniac201952014-11-18 10:06:01 -080081
Madan Jampani71582ed2014-11-18 10:06:01 -080082 log.info("Started");
Madan Jampani12390c12014-11-12 00:35:56 -080083 }
84
85 @Deactivate
86 public void deactivate() {
Madan Jampania88d1f52014-11-14 16:45:24 -080087 clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
Madan Jampani12390c12014-11-12 00:35:56 -080088 locksToAcquire.clear();
Madan Jampania88d1f52014-11-14 16:45:24 -080089 log.info("Stopped.");
Madan Jampani12390c12014-11-12 00:35:56 -080090 }
91
92 @Override
93 public Lock create(String path) {
Madan Jampani9b37d572014-11-12 11:53:24 -080094 return new DistributedLock(path, databaseService, clusterService, this);
Madan Jampani12390c12014-11-12 00:35:56 -080095 }
96
97 @Override
98 public void addListener(LockEventListener listener) {
99 // FIXME:
100 throw new UnsupportedOperationException();
101 }
102
103 @Override
104 public void removeListener(LockEventListener listener) {
105 // FIXME:
106 throw new UnsupportedOperationException();
107 }
108
Madan Jampania88d1f52014-11-14 16:45:24 -0800109 /**
110 * Attempts to acquire the lock as soon as it becomes available.
111 * @param lock lock to acquire.
112 * @param waitTimeMillis maximum time to wait before giving up.
113 * @param leaseDurationMillis the duration for which to acquire the lock initially.
Madan Jampani1769a1a2014-11-19 21:51:44 -0800114 * @return Future that can be blocked on until lock becomes available.
Madan Jampania88d1f52014-11-14 16:45:24 -0800115 */
Madan Jampani1769a1a2014-11-19 21:51:44 -0800116 protected CompletableFuture<Void> lockIfAvailable(
Madan Jampania88d1f52014-11-14 16:45:24 -0800117 Lock lock,
Madan Jampani1769a1a2014-11-19 21:51:44 -0800118 int waitTimeMillis,
Madan Jampania88d1f52014-11-14 16:45:24 -0800119 int leaseDurationMillis) {
Madan Jampani1769a1a2014-11-19 21:51:44 -0800120 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampani71582ed2014-11-18 10:06:01 -0800121 LockRequest request = new LockRequest(
122 lock,
123 leaseDurationMillis,
Madan Jampani1769a1a2014-11-19 21:51:44 -0800124 DateTime.now().plusMillis(waitTimeMillis),
Madan Jampani71582ed2014-11-18 10:06:01 -0800125 future);
126 locksToAcquire.put(lock.path(), request);
127 return future;
128 }
129
130 /**
131 * Attempts to acquire the lock as soon as it becomes available.
132 * @param lock lock to acquire.
133 * @param leaseDurationMillis the duration for which to acquire the lock initially.
134 * @return Future lease expiration date.
135 */
Madan Jampani1769a1a2014-11-19 21:51:44 -0800136 protected CompletableFuture<Void> lockIfAvailable(
Madan Jampani71582ed2014-11-18 10:06:01 -0800137 Lock lock,
138 int leaseDurationMillis) {
Madan Jampani1769a1a2014-11-19 21:51:44 -0800139 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampani71582ed2014-11-18 10:06:01 -0800140 LockRequest request = new LockRequest(
141 lock,
142 leaseDurationMillis,
143 DateTime.now().plusYears(100),
144 future);
145 locksToAcquire.put(lock.path(), request);
Madan Jampani12390c12014-11-12 00:35:56 -0800146 return future;
147 }
148
149 private class LockEventMessageListener implements ClusterMessageHandler {
150 @Override
151 public void handle(ClusterMessage message) {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800152 TableModificationEvent event = ClusterMessagingProtocol.DB_SERIALIZER
Madan Jampani9b37d572014-11-12 11:53:24 -0800153 .decode(message.payload());
Madan Jampania88d1f52014-11-14 16:45:24 -0800154 if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
155 event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
156 THREAD_POOL.submit(new RetryLockTask(event.key()));
Madan Jampani12390c12014-11-12 00:35:56 -0800157 }
Madan Jampania88d1f52014-11-14 16:45:24 -0800158 }
159 }
Madan Jampani12390c12014-11-12 00:35:56 -0800160
Madan Jampania88d1f52014-11-14 16:45:24 -0800161 private class RetryLockTask implements Runnable {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800162
Madan Jampania88d1f52014-11-14 16:45:24 -0800163 private final String path;
164
165 public RetryLockTask(String path) {
166 this.path = path;
167 }
168
169 @Override
170 public void run() {
Madan Jampani12390c12014-11-12 00:35:56 -0800171 if (!locksToAcquire.containsKey(path)) {
172 return;
173 }
174
Madan Jampania88d1f52014-11-14 16:45:24 -0800175 List<LockRequest> existingRequests = locksToAcquire.get(path);
176 if (existingRequests == null || existingRequests.isEmpty()) {
177 return;
178 }
179 log.info("Path {} is now available for locking. There are {} outstanding "
180 + "requests for it.",
181 path, existingRequests.size());
Madan Jampani12390c12014-11-12 00:35:56 -0800182
Madan Jampania88d1f52014-11-14 16:45:24 -0800183 synchronized (existingRequests) {
184 Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
185 while (existingRequestIterator.hasNext()) {
186 LockRequest request = existingRequestIterator.next();
187 if (DateTime.now().isAfter(request.requestExpirationTime())) {
188 // request expired.
189 existingRequestIterator.remove();
190 } else {
191 if (request.lock().tryLock(request.leaseDurationMillis())) {
Madan Jampani1769a1a2014-11-19 21:51:44 -0800192 request.future().complete(null);
Madan Jampani9b37d572014-11-12 11:53:24 -0800193 existingRequestIterator.remove();
Madan Jampani12390c12014-11-12 00:35:56 -0800194 }
195 }
196 }
197 }
198 }
199 }
200
201 private class LockRequest {
202
203 private final Lock lock;
Madan Jampania88d1f52014-11-14 16:45:24 -0800204 private final DateTime requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800205 private final int leaseDurationMillis;
Madan Jampani1769a1a2014-11-19 21:51:44 -0800206 private final CompletableFuture<Void> future;
Madan Jampani12390c12014-11-12 00:35:56 -0800207
Madan Jampani71582ed2014-11-18 10:06:01 -0800208 public LockRequest(
209 Lock lock,
210 int leaseDurationMillis,
211 DateTime requestExpirationTime,
Madan Jampani1769a1a2014-11-19 21:51:44 -0800212 CompletableFuture<Void> future) {
Madan Jampani12390c12014-11-12 00:35:56 -0800213
214 this.lock = lock;
Madan Jampani71582ed2014-11-18 10:06:01 -0800215 this.requestExpirationTime = requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800216 this.leaseDurationMillis = leaseDurationMillis;
217 this.future = future;
218 }
219
220 public Lock lock() {
221 return lock;
222 }
223
Madan Jampania88d1f52014-11-14 16:45:24 -0800224 public DateTime requestExpirationTime() {
225 return requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800226 }
227
228 public int leaseDurationMillis() {
229 return leaseDurationMillis;
230 }
231
Madan Jampani1769a1a2014-11-19 21:51:44 -0800232 public CompletableFuture<Void> future() {
Madan Jampani12390c12014-11-12 00:35:56 -0800233 return future;
234 }
235 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800236}