blob: ea0f7d5634dc6418cdfaaaea6ddc5016c7932632 [file] [log] [blame]
Madan Jampani12390c12014-11-12 00:35:56 -08001package org.onlab.onos.store.service.impl;
2
Madan Jampania88d1f52014-11-14 16:45:24 -08003import static org.onlab.util.Tools.namedThreads;
Madan Jampani12390c12014-11-12 00:35:56 -08004import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.Iterator;
7import java.util.List;
Madan Jampaniac201952014-11-18 10:06:01 -08008import java.util.Set;
Madan Jampani12390c12014-11-12 00:35:56 -08009import java.util.concurrent.CompletableFuture;
Madan Jampania88d1f52014-11-14 16:45:24 -080010import java.util.concurrent.ExecutorService;
11import java.util.concurrent.Executors;
Madan Jampani12390c12014-11-12 00:35:56 -080012
13import org.apache.felix.scr.annotations.Activate;
14import org.apache.felix.scr.annotations.Component;
15import org.apache.felix.scr.annotations.Deactivate;
16import org.apache.felix.scr.annotations.Reference;
17import org.apache.felix.scr.annotations.ReferenceCardinality;
18import org.apache.felix.scr.annotations.Service;
19import org.joda.time.DateTime;
20import org.onlab.onos.cluster.ClusterService;
21import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
22import org.onlab.onos.store.cluster.messaging.ClusterMessage;
23import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampaniac201952014-11-18 10:06:01 -080024import org.onlab.onos.store.service.DatabaseAdminService;
25import org.onlab.onos.store.service.DatabaseException;
Madan Jampani12390c12014-11-12 00:35:56 -080026import org.onlab.onos.store.service.DatabaseService;
27import org.onlab.onos.store.service.Lock;
28import org.onlab.onos.store.service.LockEventListener;
29import org.onlab.onos.store.service.LockService;
30import org.slf4j.Logger;
31
Madan Jampania88d1f52014-11-14 16:45:24 -080032import com.google.common.collect.LinkedListMultimap;
33import com.google.common.collect.ListMultimap;
34import com.google.common.collect.Multimaps;
Madan Jampani12390c12014-11-12 00:35:56 -080035
36@Component(immediate = true)
37@Service
38public class DistributedLockManager implements LockService {
39
Madan Jampania88d1f52014-11-14 16:45:24 -080040 private static final ExecutorService THREAD_POOL =
41 Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
42
Madan Jampani12390c12014-11-12 00:35:56 -080043 private final Logger log = getLogger(getClass());
44
45 public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
46
Madan Jampaniac201952014-11-18 10:06:01 -080047 public static final int DEAD_LOCK_TIMEOUT_MS = 5000;
48
Madan Jampania88d1f52014-11-14 16:45:24 -080049 private final ListMultimap<String, LockRequest> locksToAcquire =
50 Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
Madan Jampani12390c12014-11-12 00:35:56 -080051
52 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 private ClusterCommunicationService clusterCommunicator;
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniac201952014-11-18 10:06:01 -080056 private DatabaseAdminService databaseAdminService;
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani12390c12014-11-12 00:35:56 -080059 private DatabaseService databaseService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 private ClusterService clusterService;
63
64 @Activate
65 public void activate() {
Madan Jampaniac201952014-11-18 10:06:01 -080066 try {
67 Set<String> tableNames = databaseAdminService.listTables();
68 if (!tableNames.contains(ONOS_LOCK_TABLE_NAME)) {
69 if (databaseAdminService.createTable(ONOS_LOCK_TABLE_NAME, DEAD_LOCK_TIMEOUT_MS)) {
70 log.info("Created {} table.", ONOS_LOCK_TABLE_NAME);
71 }
72 }
73 } catch (DatabaseException e) {
74 log.error("DistributedLockManager#activate failed.", e);
75 throw e;
76 }
77
Madan Jampani12390c12014-11-12 00:35:56 -080078 clusterCommunicator.addSubscriber(
79 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
80 new LockEventMessageListener());
Madan Jampaniac201952014-11-18 10:06:01 -080081
Madan Jampani12390c12014-11-12 00:35:56 -080082 log.info("Started.");
83
84 }
85
86 @Deactivate
87 public void deactivate() {
Madan Jampania88d1f52014-11-14 16:45:24 -080088 clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
Madan Jampani12390c12014-11-12 00:35:56 -080089 locksToAcquire.clear();
Madan Jampania88d1f52014-11-14 16:45:24 -080090 log.info("Stopped.");
Madan Jampani12390c12014-11-12 00:35:56 -080091 }
92
93 @Override
94 public Lock create(String path) {
Madan Jampani9b37d572014-11-12 11:53:24 -080095 return new DistributedLock(path, databaseService, clusterService, this);
Madan Jampani12390c12014-11-12 00:35:56 -080096 }
97
98 @Override
99 public void addListener(LockEventListener listener) {
100 // FIXME:
101 throw new UnsupportedOperationException();
102 }
103
104 @Override
105 public void removeListener(LockEventListener listener) {
106 // FIXME:
107 throw new UnsupportedOperationException();
108 }
109
Madan Jampania88d1f52014-11-14 16:45:24 -0800110 /**
111 * Attempts to acquire the lock as soon as it becomes available.
112 * @param lock lock to acquire.
113 * @param waitTimeMillis maximum time to wait before giving up.
114 * @param leaseDurationMillis the duration for which to acquire the lock initially.
115 * @return Future lease expiration date.
116 */
117 protected CompletableFuture<DateTime> lockIfAvailable(
118 Lock lock,
119 long waitTimeMillis,
120 int leaseDurationMillis) {
121 CompletableFuture<DateTime> future = new CompletableFuture<>();
122 locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
Madan Jampani12390c12014-11-12 00:35:56 -0800123 return future;
124 }
125
126 private class LockEventMessageListener implements ClusterMessageHandler {
127 @Override
128 public void handle(ClusterMessage message) {
Madan Jampani9b37d572014-11-12 11:53:24 -0800129 TableModificationEvent event = DatabaseStateMachine.SERIALIZER
130 .decode(message.payload());
Madan Jampania88d1f52014-11-14 16:45:24 -0800131 if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
132 event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
133 THREAD_POOL.submit(new RetryLockTask(event.key()));
Madan Jampani12390c12014-11-12 00:35:56 -0800134 }
Madan Jampania88d1f52014-11-14 16:45:24 -0800135 }
136 }
Madan Jampani12390c12014-11-12 00:35:56 -0800137
Madan Jampania88d1f52014-11-14 16:45:24 -0800138 private class RetryLockTask implements Runnable {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800139
Madan Jampania88d1f52014-11-14 16:45:24 -0800140 private final String path;
141
142 public RetryLockTask(String path) {
143 this.path = path;
144 }
145
146 @Override
147 public void run() {
Madan Jampani12390c12014-11-12 00:35:56 -0800148 if (!locksToAcquire.containsKey(path)) {
149 return;
150 }
151
Madan Jampania88d1f52014-11-14 16:45:24 -0800152 List<LockRequest> existingRequests = locksToAcquire.get(path);
153 if (existingRequests == null || existingRequests.isEmpty()) {
154 return;
155 }
156 log.info("Path {} is now available for locking. There are {} outstanding "
157 + "requests for it.",
158 path, existingRequests.size());
Madan Jampani12390c12014-11-12 00:35:56 -0800159
Madan Jampania88d1f52014-11-14 16:45:24 -0800160 synchronized (existingRequests) {
161 Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
162 while (existingRequestIterator.hasNext()) {
163 LockRequest request = existingRequestIterator.next();
164 if (DateTime.now().isAfter(request.requestExpirationTime())) {
165 // request expired.
166 existingRequestIterator.remove();
167 } else {
168 if (request.lock().tryLock(request.leaseDurationMillis())) {
169 request.future().complete(DateTime.now().plusMillis(request.leaseDurationMillis()));
Madan Jampani9b37d572014-11-12 11:53:24 -0800170 existingRequestIterator.remove();
Madan Jampani12390c12014-11-12 00:35:56 -0800171 }
172 }
173 }
174 }
175 }
176 }
177
178 private class LockRequest {
179
180 private final Lock lock;
Madan Jampania88d1f52014-11-14 16:45:24 -0800181 private final DateTime requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800182 private final int leaseDurationMillis;
Madan Jampania88d1f52014-11-14 16:45:24 -0800183 private final CompletableFuture<DateTime> future;
Madan Jampani12390c12014-11-12 00:35:56 -0800184
Madan Jampani9b37d572014-11-12 11:53:24 -0800185 public LockRequest(Lock lock, long waitTimeMillis,
Madan Jampania88d1f52014-11-14 16:45:24 -0800186 int leaseDurationMillis, CompletableFuture<DateTime> future) {
Madan Jampani12390c12014-11-12 00:35:56 -0800187
188 this.lock = lock;
Madan Jampania88d1f52014-11-14 16:45:24 -0800189 this.requestExpirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
Madan Jampani12390c12014-11-12 00:35:56 -0800190 this.leaseDurationMillis = leaseDurationMillis;
191 this.future = future;
192 }
193
194 public Lock lock() {
195 return lock;
196 }
197
Madan Jampania88d1f52014-11-14 16:45:24 -0800198 public DateTime requestExpirationTime() {
199 return requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800200 }
201
202 public int leaseDurationMillis() {
203 return leaseDurationMillis;
204 }
205
Madan Jampania88d1f52014-11-14 16:45:24 -0800206 public CompletableFuture<DateTime> future() {
Madan Jampani12390c12014-11-12 00:35:56 -0800207 return future;
208 }
209 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800210}