blob: ee0b54fd3cd5816c4723e6a159c50eb4174dc971 [file] [log] [blame]
Brian O'Connorabafb502014-12-02 22:26:20 -08001package org.onosproject.store.service.impl;
Madan Jampani12390c12014-11-12 00:35:56 -08002
Madan Jampania88d1f52014-11-14 16:45:24 -08003import static org.onlab.util.Tools.namedThreads;
Madan Jampani12390c12014-11-12 00:35:56 -08004import static org.slf4j.LoggerFactory.getLogger;
5
6import java.util.Iterator;
7import java.util.List;
Madan Jampaniac201952014-11-18 10:06:01 -08008import java.util.Set;
Madan Jampani12390c12014-11-12 00:35:56 -08009import java.util.concurrent.CompletableFuture;
Madan Jampania88d1f52014-11-14 16:45:24 -080010import java.util.concurrent.ExecutorService;
11import java.util.concurrent.Executors;
Madan Jampani12390c12014-11-12 00:35:56 -080012
13import org.apache.felix.scr.annotations.Activate;
14import org.apache.felix.scr.annotations.Component;
15import org.apache.felix.scr.annotations.Deactivate;
16import org.apache.felix.scr.annotations.Reference;
17import org.apache.felix.scr.annotations.ReferenceCardinality;
18import org.apache.felix.scr.annotations.Service;
19import org.joda.time.DateTime;
Brian O'Connorabafb502014-12-02 22:26:20 -080020import org.onosproject.cluster.ClusterService;
21import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
22import org.onosproject.store.cluster.messaging.ClusterMessage;
23import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
24import org.onosproject.store.service.DatabaseAdminService;
25import org.onosproject.store.service.DatabaseException;
26import org.onosproject.store.service.DatabaseService;
27import org.onosproject.store.service.Lock;
28import org.onosproject.store.service.LockEventListener;
29import org.onosproject.store.service.LockService;
Madan Jampani12390c12014-11-12 00:35:56 -080030import org.slf4j.Logger;
31
Madan Jampania88d1f52014-11-14 16:45:24 -080032import com.google.common.collect.LinkedListMultimap;
33import com.google.common.collect.ListMultimap;
34import com.google.common.collect.Multimaps;
Madan Jampani12390c12014-11-12 00:35:56 -080035
Brian O'Connor5d55ed42014-12-01 18:27:47 -080036@Component(immediate = false)
Madan Jampani12390c12014-11-12 00:35:56 -080037@Service
38public class DistributedLockManager implements LockService {
39
Madan Jampania88d1f52014-11-14 16:45:24 -080040 private static final ExecutorService THREAD_POOL =
41 Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
42
Madan Jampani12390c12014-11-12 00:35:56 -080043 private final Logger log = getLogger(getClass());
44
45 public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
46
Madan Jampaniac201952014-11-18 10:06:01 -080047 public static final int DEAD_LOCK_TIMEOUT_MS = 5000;
48
Madan Jampania88d1f52014-11-14 16:45:24 -080049 private final ListMultimap<String, LockRequest> locksToAcquire =
50 Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
Madan Jampani12390c12014-11-12 00:35:56 -080051
52 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 private ClusterCommunicationService clusterCommunicator;
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampaniac201952014-11-18 10:06:01 -080056 private DatabaseAdminService databaseAdminService;
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani12390c12014-11-12 00:35:56 -080059 private DatabaseService databaseService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 private ClusterService clusterService;
63
64 @Activate
65 public void activate() {
Madan Jampaniac201952014-11-18 10:06:01 -080066 try {
Madan Jampani71582ed2014-11-18 10:06:01 -080067 Set<String> tables = databaseAdminService.listTables();
68
69 if (!tables.contains(ONOS_LOCK_TABLE_NAME)) {
Madan Jampaniac201952014-11-18 10:06:01 -080070 if (databaseAdminService.createTable(ONOS_LOCK_TABLE_NAME, DEAD_LOCK_TIMEOUT_MS)) {
71 log.info("Created {} table.", ONOS_LOCK_TABLE_NAME);
72 }
73 }
74 } catch (DatabaseException e) {
75 log.error("DistributedLockManager#activate failed.", e);
Madan Jampaniac201952014-11-18 10:06:01 -080076 }
77
Madan Jampani71582ed2014-11-18 10:06:01 -080078 clusterCommunicator.addSubscriber(
79 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
80 new LockEventMessageListener());
Madan Jampaniac201952014-11-18 10:06:01 -080081
Madan Jampani71582ed2014-11-18 10:06:01 -080082 log.info("Started");
Madan Jampani12390c12014-11-12 00:35:56 -080083 }
84
85 @Deactivate
86 public void deactivate() {
Madan Jampania88d1f52014-11-14 16:45:24 -080087 clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
Madan Jampani12390c12014-11-12 00:35:56 -080088 locksToAcquire.clear();
Madan Jampania88d1f52014-11-14 16:45:24 -080089 log.info("Stopped.");
Madan Jampani12390c12014-11-12 00:35:56 -080090 }
91
92 @Override
93 public Lock create(String path) {
Madan Jampani9b37d572014-11-12 11:53:24 -080094 return new DistributedLock(path, databaseService, clusterService, this);
Madan Jampani12390c12014-11-12 00:35:56 -080095 }
96
97 @Override
98 public void addListener(LockEventListener listener) {
Madan Jampani12390c12014-11-12 00:35:56 -080099 throw new UnsupportedOperationException();
100 }
101
102 @Override
103 public void removeListener(LockEventListener listener) {
Madan Jampani12390c12014-11-12 00:35:56 -0800104 throw new UnsupportedOperationException();
105 }
106
Madan Jampania88d1f52014-11-14 16:45:24 -0800107 /**
108 * Attempts to acquire the lock as soon as it becomes available.
109 * @param lock lock to acquire.
110 * @param waitTimeMillis maximum time to wait before giving up.
111 * @param leaseDurationMillis the duration for which to acquire the lock initially.
Madan Jampani1769a1a2014-11-19 21:51:44 -0800112 * @return Future that can be blocked on until lock becomes available.
Madan Jampania88d1f52014-11-14 16:45:24 -0800113 */
Madan Jampani1769a1a2014-11-19 21:51:44 -0800114 protected CompletableFuture<Void> lockIfAvailable(
Madan Jampania88d1f52014-11-14 16:45:24 -0800115 Lock lock,
Madan Jampani1769a1a2014-11-19 21:51:44 -0800116 int waitTimeMillis,
Madan Jampania88d1f52014-11-14 16:45:24 -0800117 int leaseDurationMillis) {
Madan Jampani1769a1a2014-11-19 21:51:44 -0800118 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampani71582ed2014-11-18 10:06:01 -0800119 LockRequest request = new LockRequest(
120 lock,
121 leaseDurationMillis,
Madan Jampani1769a1a2014-11-19 21:51:44 -0800122 DateTime.now().plusMillis(waitTimeMillis),
Madan Jampani71582ed2014-11-18 10:06:01 -0800123 future);
124 locksToAcquire.put(lock.path(), request);
125 return future;
126 }
127
128 /**
129 * Attempts to acquire the lock as soon as it becomes available.
130 * @param lock lock to acquire.
131 * @param leaseDurationMillis the duration for which to acquire the lock initially.
132 * @return Future lease expiration date.
133 */
Madan Jampani1769a1a2014-11-19 21:51:44 -0800134 protected CompletableFuture<Void> lockIfAvailable(
Madan Jampani71582ed2014-11-18 10:06:01 -0800135 Lock lock,
136 int leaseDurationMillis) {
Madan Jampani1769a1a2014-11-19 21:51:44 -0800137 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampani71582ed2014-11-18 10:06:01 -0800138 LockRequest request = new LockRequest(
139 lock,
140 leaseDurationMillis,
141 DateTime.now().plusYears(100),
142 future);
143 locksToAcquire.put(lock.path(), request);
Madan Jampani12390c12014-11-12 00:35:56 -0800144 return future;
145 }
146
147 private class LockEventMessageListener implements ClusterMessageHandler {
148 @Override
149 public void handle(ClusterMessage message) {
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800150 TableModificationEvent event = ClusterMessagingProtocol.DB_SERIALIZER
Madan Jampani9b37d572014-11-12 11:53:24 -0800151 .decode(message.payload());
Madan Jampania88d1f52014-11-14 16:45:24 -0800152 if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
153 event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
154 THREAD_POOL.submit(new RetryLockTask(event.key()));
Madan Jampani12390c12014-11-12 00:35:56 -0800155 }
Madan Jampania88d1f52014-11-14 16:45:24 -0800156 }
157 }
Madan Jampani12390c12014-11-12 00:35:56 -0800158
Madan Jampania88d1f52014-11-14 16:45:24 -0800159 private class RetryLockTask implements Runnable {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800160
Madan Jampania88d1f52014-11-14 16:45:24 -0800161 private final String path;
162
163 public RetryLockTask(String path) {
164 this.path = path;
165 }
166
167 @Override
168 public void run() {
Madan Jampani12390c12014-11-12 00:35:56 -0800169 if (!locksToAcquire.containsKey(path)) {
170 return;
171 }
172
Madan Jampania88d1f52014-11-14 16:45:24 -0800173 List<LockRequest> existingRequests = locksToAcquire.get(path);
174 if (existingRequests == null || existingRequests.isEmpty()) {
175 return;
176 }
177 log.info("Path {} is now available for locking. There are {} outstanding "
178 + "requests for it.",
179 path, existingRequests.size());
Madan Jampani12390c12014-11-12 00:35:56 -0800180
Madan Jampania88d1f52014-11-14 16:45:24 -0800181 synchronized (existingRequests) {
182 Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
183 while (existingRequestIterator.hasNext()) {
184 LockRequest request = existingRequestIterator.next();
185 if (DateTime.now().isAfter(request.requestExpirationTime())) {
186 // request expired.
187 existingRequestIterator.remove();
188 } else {
189 if (request.lock().tryLock(request.leaseDurationMillis())) {
Madan Jampani1769a1a2014-11-19 21:51:44 -0800190 request.future().complete(null);
Madan Jampani9b37d572014-11-12 11:53:24 -0800191 existingRequestIterator.remove();
Madan Jampani12390c12014-11-12 00:35:56 -0800192 }
193 }
194 }
195 }
196 }
197 }
198
199 private class LockRequest {
200
201 private final Lock lock;
Madan Jampania88d1f52014-11-14 16:45:24 -0800202 private final DateTime requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800203 private final int leaseDurationMillis;
Madan Jampani1769a1a2014-11-19 21:51:44 -0800204 private final CompletableFuture<Void> future;
Madan Jampani12390c12014-11-12 00:35:56 -0800205
Madan Jampani71582ed2014-11-18 10:06:01 -0800206 public LockRequest(
207 Lock lock,
208 int leaseDurationMillis,
209 DateTime requestExpirationTime,
Madan Jampani1769a1a2014-11-19 21:51:44 -0800210 CompletableFuture<Void> future) {
Madan Jampani12390c12014-11-12 00:35:56 -0800211
212 this.lock = lock;
Madan Jampani71582ed2014-11-18 10:06:01 -0800213 this.requestExpirationTime = requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800214 this.leaseDurationMillis = leaseDurationMillis;
215 this.future = future;
216 }
217
218 public Lock lock() {
219 return lock;
220 }
221
Madan Jampania88d1f52014-11-14 16:45:24 -0800222 public DateTime requestExpirationTime() {
223 return requestExpirationTime;
Madan Jampani12390c12014-11-12 00:35:56 -0800224 }
225
226 public int leaseDurationMillis() {
227 return leaseDurationMillis;
228 }
229
Madan Jampani1769a1a2014-11-19 21:51:44 -0800230 public CompletableFuture<Void> future() {
Madan Jampani12390c12014-11-12 00:35:56 -0800231 return future;
232 }
233 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800234}