blob: a8941aaacb4b1b8e8228ccb51e83beed01483ec3 [file] [log] [blame]
Madan Jampani12390c12014-11-12 00:35:56 -08001package org.onlab.onos.store.service.impl;
2
Madan Jampania88d1f52014-11-14 16:45:24 -08003import static org.onlab.util.Tools.namedThreads;
Madan Jampani12390c12014-11-12 00:35:56 -08004import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.Iterator;
7import java.util.List;
8import java.util.concurrent.CompletableFuture;
Madan Jampania88d1f52014-11-14 16:45:24 -08009import java.util.concurrent.ExecutorService;
10import java.util.concurrent.Executors;
Madan Jampani12390c12014-11-12 00:35:56 -080011
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
15import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
17import org.apache.felix.scr.annotations.Service;
18import org.joda.time.DateTime;
19import org.onlab.onos.cluster.ClusterService;
20import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
21import org.onlab.onos.store.cluster.messaging.ClusterMessage;
22import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
23import org.onlab.onos.store.service.DatabaseService;
24import org.onlab.onos.store.service.Lock;
25import org.onlab.onos.store.service.LockEventListener;
26import org.onlab.onos.store.service.LockService;
27import org.slf4j.Logger;
28
Madan Jampania88d1f52014-11-14 16:45:24 -080029import com.google.common.collect.LinkedListMultimap;
30import com.google.common.collect.ListMultimap;
31import com.google.common.collect.Multimaps;
Madan Jampani12390c12014-11-12 00:35:56 -080032
33@Component(immediate = true)
34@Service
35public class DistributedLockManager implements LockService {
36
Madan Jampania88d1f52014-11-14 16:45:24 -080037 private static final ExecutorService THREAD_POOL =
38 Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
39
Madan Jampani12390c12014-11-12 00:35:56 -080040 private final Logger log = getLogger(getClass());
41
42 public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
43
Madan Jampania88d1f52014-11-14 16:45:24 -080044 private final ListMultimap<String, LockRequest> locksToAcquire =
45 Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
Madan Jampani12390c12014-11-12 00:35:56 -080046
47 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
48 private ClusterCommunicationService clusterCommunicator;
49
50 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
51 private DatabaseService databaseService;
52
53 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
54 private ClusterService clusterService;
55
56 @Activate
57 public void activate() {
58 clusterCommunicator.addSubscriber(
59 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
60 new LockEventMessageListener());
61 log.info("Started.");
62
63 }
64
65 @Deactivate
66 public void deactivate() {
Madan Jampania88d1f52014-11-14 16:45:24 -080067 clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
Madan Jampani12390c12014-11-12 00:35:56 -080068 locksToAcquire.clear();
Madan Jampania88d1f52014-11-14 16:45:24 -080069 log.info("Stopped.");
Madan Jampani12390c12014-11-12 00:35:56 -080070 }
71
72 @Override
73 public Lock create(String path) {
Madan Jampani9b37d572014-11-12 11:53:24 -080074 return new DistributedLock(path, databaseService, clusterService, this);
Madan Jampani12390c12014-11-12 00:35:56 -080075 }
76
77 @Override
78 public void addListener(LockEventListener listener) {
79 // FIXME:
80 throw new UnsupportedOperationException();
81 }
82
83 @Override
84 public void removeListener(LockEventListener listener) {
85 // FIXME:
86 throw new UnsupportedOperationException();
87 }
88
Madan Jampania88d1f52014-11-14 16:45:24 -080089 /**
90 * Attempts to acquire the lock as soon as it becomes available.
91 * @param lock lock to acquire.
92 * @param waitTimeMillis maximum time to wait before giving up.
93 * @param leaseDurationMillis the duration for which to acquire the lock initially.
94 * @return Future lease expiration date.
95 */
96 protected CompletableFuture<DateTime> lockIfAvailable(
97 Lock lock,
98 long waitTimeMillis,
99 int leaseDurationMillis) {
100 CompletableFuture<DateTime> future = new CompletableFuture<>();
101 locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
Madan Jampani12390c12014-11-12 00:35:56 -0800102 return future;
103 }
104
105 private class LockEventMessageListener implements ClusterMessageHandler {
106 @Override
107 public void handle(ClusterMessage message) {
Madan Jampani9b37d572014-11-12 11:53:24 -0800108 TableModificationEvent event = DatabaseStateMachine.SERIALIZER
109 .decode(message.payload());
Madan Jampania88d1f52014-11-14 16:45:24 -0800110 if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
111 event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
112 THREAD_POOL.submit(new RetryLockTask(event.key()));
Madan Jampani12390c12014-11-12 00:35:56 -0800113 }
Madan Jampania88d1f52014-11-14 16:45:24 -0800114 }
115 }
Madan Jampani12390c12014-11-12 00:35:56 -0800116
Madan Jampania88d1f52014-11-14 16:45:24 -0800117 private class RetryLockTask implements Runnable {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800118
Madan Jampania88d1f52014-11-14 16:45:24 -0800119 private final String path;
120
121 public RetryLockTask(String path) {
122 this.path = path;
123 }
124
125 @Override
126 public void run() {
Madan Jampani12390c12014-11-12 00:35:56 -0800127 if (!locksToAcquire.containsKey(path)) {
128 return;
129 }
130
Madan Jampania88d1f52014-11-14 16:45:24 -0800131 List<LockRequest> existingRequests = locksToAcquire.get(path);
132 if (existingRequests == null || existingRequests.isEmpty()) {
133 return;
134 }
135 log.info("Path {} is now available for locking. There are {} outstanding "
136 + "requests for it.",
137 path, existingRequests.size());
Madan Jampani12390c12014-11-12 00:35:56 -0800138
Madan Jampania88d1f52014-11-14 16:45:24 -0800139 synchronized (existingRequests) {
140 Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
141 while (existingRequestIterator.hasNext()) {
142 LockRequest request = existingRequestIterator.next();
143 if (DateTime.now().isAfter(request.requestExpirationTime())) {
144 // request expired.
145 existingRequestIterator.remove();
146 } else {
147 if (request.lock().tryLock(request.leaseDurationMillis())) {
148 request.future().complete(DateTime.now().plusMillis(request.leaseDurationMillis()));
Madan Jampani9b37d572014-11-12 11:53:24 -0800149 existingRequestIterator.remove();
Madan Jampani12390c12014-11-12 00:35:56 -0800150 }
151 }
152 }
153 }
154 }
155 }
156
157 private class LockRequest {
158
159 private final Lock lock;
Madan Jampania88d1f52014-11-14 16:45:24 -0800160 private final DateTime requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800161 private final int leaseDurationMillis;
Madan Jampania88d1f52014-11-14 16:45:24 -0800162 private final CompletableFuture<DateTime> future;
Madan Jampani12390c12014-11-12 00:35:56 -0800163
Madan Jampani9b37d572014-11-12 11:53:24 -0800164 public LockRequest(Lock lock, long waitTimeMillis,
Madan Jampania88d1f52014-11-14 16:45:24 -0800165 int leaseDurationMillis, CompletableFuture<DateTime> future) {
Madan Jampani12390c12014-11-12 00:35:56 -0800166
167 this.lock = lock;
Madan Jampania88d1f52014-11-14 16:45:24 -0800168 this.requestExpirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
Madan Jampani12390c12014-11-12 00:35:56 -0800169 this.leaseDurationMillis = leaseDurationMillis;
170 this.future = future;
171 }
172
173 public Lock lock() {
174 return lock;
175 }
176
Madan Jampania88d1f52014-11-14 16:45:24 -0800177 public DateTime requestExpirationTime() {
178 return requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800179 }
180
181 public int leaseDurationMillis() {
182 return leaseDurationMillis;
183 }
184
Madan Jampania88d1f52014-11-14 16:45:24 -0800185 public CompletableFuture<DateTime> future() {
Madan Jampani12390c12014-11-12 00:35:56 -0800186 return future;
187 }
188 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800189}