blob: 49981792672578cd8164b0392631c526f163e53a [file] [log] [blame]
Madan Jampani12390c12014-11-12 00:35:56 -08001package org.onlab.onos.store.service.impl;
2
Madan Jampania88d1f52014-11-14 16:45:24 -08003import static org.slf4j.LoggerFactory.getLogger;
4
Ray Milkey241b96a2014-11-17 13:08:20 -08005import java.nio.charset.StandardCharsets;
Madan Jampani12390c12014-11-12 00:35:56 -08006import java.util.UUID;
7import java.util.concurrent.CompletableFuture;
8import java.util.concurrent.ExecutionException;
9import java.util.concurrent.TimeUnit;
10import java.util.concurrent.TimeoutException;
11import java.util.concurrent.atomic.AtomicBoolean;
12
13import org.joda.time.DateTime;
14import org.onlab.onos.cluster.ClusterService;
15import org.onlab.onos.store.service.DatabaseService;
16import org.onlab.onos.store.service.Lock;
Madan Jampania88d1f52014-11-14 16:45:24 -080017import org.slf4j.Logger;
Madan Jampani12390c12014-11-12 00:35:56 -080018
19/**
20 * A distributed lock implementation.
21 */
22public class DistributedLock implements Lock {
23
Madan Jampania88d1f52014-11-14 16:45:24 -080024 private final Logger log = getLogger(getClass());
25
26 private static final long MAX_WAIT_TIME_MS = 100000000L;
27
Madan Jampani12390c12014-11-12 00:35:56 -080028 private final DistributedLockManager lockManager;
29 private final DatabaseService databaseService;
30 private final String path;
31 private DateTime lockExpirationTime;
32 private AtomicBoolean isLocked = new AtomicBoolean(false);
33 private byte[] lockId;
34
35 public DistributedLock(
36 String path,
37 DatabaseService databaseService,
38 ClusterService clusterService,
39 DistributedLockManager lockManager) {
40
41 this.path = path;
42 this.databaseService = databaseService;
43 this.lockManager = lockManager;
44 this.lockId =
Ray Milkey241b96a2014-11-17 13:08:20 -080045 (UUID.randomUUID().toString() + "::" +
46 clusterService.getLocalNode().id().toString()).
47 getBytes(StandardCharsets.UTF_8);
Madan Jampani12390c12014-11-12 00:35:56 -080048 }
49
50 @Override
51 public String path() {
52 return path;
53 }
54
55 @Override
56 public void lock(int leaseDurationMillis) {
Madan Jampani12390c12014-11-12 00:35:56 -080057 if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
58 // Nothing to do.
59 // Current expiration time is beyond what is requested.
60 return;
61 } else {
Madan Jampania88d1f52014-11-14 16:45:24 -080062 tryLock(MAX_WAIT_TIME_MS, leaseDurationMillis);
Madan Jampani12390c12014-11-12 00:35:56 -080063 }
64 }
65
66 @Override
67 public boolean tryLock(int leaseDurationMillis) {
Madan Jampania88d1f52014-11-14 16:45:24 -080068 if (databaseService.putIfAbsent(
Madan Jampanic22123d2014-11-12 02:12:19 -080069 DistributedLockManager.ONOS_LOCK_TABLE_NAME,
70 path,
Madan Jampania88d1f52014-11-14 16:45:24 -080071 lockId)) {
72 isLocked.set(true);
73 lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
74 return true;
75 }
76 return false;
Madan Jampani12390c12014-11-12 00:35:56 -080077 }
78
79 @Override
80 public boolean tryLock(
81 long waitTimeMillis,
82 int leaseDurationMillis) {
Madan Jampania88d1f52014-11-14 16:45:24 -080083 if (tryLock(leaseDurationMillis)) {
84 return true;
Madan Jampani12390c12014-11-12 00:35:56 -080085 }
Madan Jampania88d1f52014-11-14 16:45:24 -080086
87 CompletableFuture<DateTime> future =
88 lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
89 try {
90 lockExpirationTime = future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
91 return true;
92 } catch (ExecutionException | InterruptedException e) {
93 log.error("Encountered an exception trying to acquire lock for " + path, e);
94 // TODO: ExecutionException could indicate something
95 // wrong with the backing database.
96 // Throw an exception?
97 return false;
98 } catch (TimeoutException e) {
99 log.debug("Timed out waiting to acquire lock for {}", path);
100 return false;
101 }
Madan Jampani12390c12014-11-12 00:35:56 -0800102 }
103
104 @Override
105 public boolean isLocked() {
106 if (isLocked.get()) {
107 // We rely on local information to check
Madan Jampania88d1f52014-11-14 16:45:24 -0800108 // if the lock expired.
Madan Jampani12390c12014-11-12 00:35:56 -0800109 // This should should make this call
Madan Jampania88d1f52014-11-14 16:45:24 -0800110 // light weight, while still retaining the
Madan Jampani12390c12014-11-12 00:35:56 -0800111 // safety guarantees.
112 if (DateTime.now().isAfter(lockExpirationTime)) {
113 isLocked.set(false);
114 return false;
Madan Jampanif5d263b2014-11-13 10:04:40 -0800115 } else {
116 return true;
Madan Jampani12390c12014-11-12 00:35:56 -0800117 }
118 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800119 return false;
Madan Jampani12390c12014-11-12 00:35:56 -0800120 }
121
122 @Override
123 public void unlock() {
124 if (!isLocked()) {
125 return;
126 } else {
Madan Jampania88d1f52014-11-14 16:45:24 -0800127 if (databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId)) {
128 isLocked.set(false);
129 }
Madan Jampani12390c12014-11-12 00:35:56 -0800130 }
131 }
132
133 @Override
134 public boolean extendExpiration(int leaseDurationMillis) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800135 if (!isLocked()) {
136 log.warn("Ignoring request to extend expiration for lock {}."
137 + " ExtendExpiration must be called for locks that are already acquired.", path);
Pavlin Radoslavov20ded692014-11-17 16:03:15 -0800138 return false;
Madan Jampania88d1f52014-11-14 16:45:24 -0800139 }
140
141 if (databaseService.putIfValueMatches(
142 DistributedLockManager.ONOS_LOCK_TABLE_NAME,
143 path,
144 lockId,
145 lockId)) {
146 lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
147 log.debug("Succeeded in extending lock {} expiration time to {}", lockExpirationTime);
Madan Jampani12390c12014-11-12 00:35:56 -0800148 return true;
149 } else {
Madan Jampania88d1f52014-11-14 16:45:24 -0800150 log.info("Failed to extend expiration for {}", path);
151 return false;
Madan Jampani12390c12014-11-12 00:35:56 -0800152 }
153 }
Madan Jampania88d1f52014-11-14 16:45:24 -0800154}