blob: 6d99ba73b9143c69fdd1900f5bcbbc45d2f6537d [file] [log] [blame]
Madan Jampani12390c12014-11-12 00:35:56 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.Iterator;
6import java.util.List;
7import java.util.concurrent.CompletableFuture;
8
9import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
15import org.joda.time.DateTime;
16import org.onlab.onos.cluster.ClusterService;
17import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
18import org.onlab.onos.store.cluster.messaging.ClusterMessage;
19import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
20import org.onlab.onos.store.service.DatabaseService;
21import org.onlab.onos.store.service.Lock;
22import org.onlab.onos.store.service.LockEventListener;
23import org.onlab.onos.store.service.LockService;
24import org.slf4j.Logger;
25
26import com.google.common.collect.ArrayListMultimap;
27
28@Component(immediate = true)
29@Service
30public class DistributedLockManager implements LockService {
31
32 private final Logger log = getLogger(getClass());
33
34 public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
35
Madan Jampani9b37d572014-11-12 11:53:24 -080036 private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
37 .create();
Madan Jampani12390c12014-11-12 00:35:56 -080038
39 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
40 private ClusterCommunicationService clusterCommunicator;
41
42 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
43 private DatabaseService databaseService;
44
45 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
46 private ClusterService clusterService;
47
48 @Activate
49 public void activate() {
50 clusterCommunicator.addSubscriber(
51 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
52 new LockEventMessageListener());
53 log.info("Started.");
54
55 }
56
57 @Deactivate
58 public void deactivate() {
59 locksToAcquire.clear();
60 log.info("Started.");
61 }
62
63 @Override
64 public Lock create(String path) {
Madan Jampani9b37d572014-11-12 11:53:24 -080065 return new DistributedLock(path, databaseService, clusterService, this);
Madan Jampani12390c12014-11-12 00:35:56 -080066 }
67
68 @Override
69 public void addListener(LockEventListener listener) {
70 // FIXME:
71 throw new UnsupportedOperationException();
72 }
73
74 @Override
75 public void removeListener(LockEventListener listener) {
76 // FIXME:
77 throw new UnsupportedOperationException();
78 }
79
Madan Jampani9b37d572014-11-12 11:53:24 -080080 protected CompletableFuture<Void> lockIfAvailable(Lock lock,
81 long waitTimeMillis, int leaseDurationMillis) {
Madan Jampani12390c12014-11-12 00:35:56 -080082 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampani9b37d572014-11-12 11:53:24 -080083 locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
84 leaseDurationMillis, future));
Madan Jampani12390c12014-11-12 00:35:56 -080085 return future;
86 }
87
88 private class LockEventMessageListener implements ClusterMessageHandler {
89 @Override
90 public void handle(ClusterMessage message) {
Madan Jampani9b37d572014-11-12 11:53:24 -080091 TableModificationEvent event = DatabaseStateMachine.SERIALIZER
92 .decode(message.payload());
Madan Jampani12390c12014-11-12 00:35:56 -080093 if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
94 return;
95 }
96
97 String path = event.key();
98 if (!locksToAcquire.containsKey(path)) {
99 return;
100 }
101
102 if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
103 List<LockRequest> existingRequests = locksToAcquire.get(path);
Madan Jampani44e6a542014-11-12 01:06:51 -0800104 if (existingRequests == null) {
105 return;
106 }
Madan Jampani12390c12014-11-12 00:35:56 -0800107
Madan Jampani9b37d572014-11-12 11:53:24 -0800108 synchronized (existingRequests) {
109
110 Iterator<LockRequest> existingRequestIterator = existingRequests
111 .iterator();
112 while (existingRequestIterator.hasNext()) {
113 LockRequest request = existingRequestIterator.next();
114 if (request.expirationTime().isAfter(DateTime.now())) {
115 existingRequestIterator.remove();
116 } else {
117 if (request.lock().tryLock(
118 request.leaseDurationMillis())) {
119 request.future().complete(null);
120 existingRequestIterator.remove();
121 }
Madan Jampani12390c12014-11-12 00:35:56 -0800122 }
123 }
124 }
125 }
126 }
127 }
128
129 private class LockRequest {
130
131 private final Lock lock;
132 private final DateTime expirationTime;
133 private final int leaseDurationMillis;
134 private final CompletableFuture<Void> future;
135
Madan Jampani9b37d572014-11-12 11:53:24 -0800136 public LockRequest(Lock lock, long waitTimeMillis,
137 int leaseDurationMillis, CompletableFuture<Void> future) {
Madan Jampani12390c12014-11-12 00:35:56 -0800138
139 this.lock = lock;
Madan Jampani9b37d572014-11-12 11:53:24 -0800140 this.expirationTime = DateTime.now().plusMillis(
141 (int) waitTimeMillis);
Madan Jampani12390c12014-11-12 00:35:56 -0800142 this.leaseDurationMillis = leaseDurationMillis;
143 this.future = future;
144 }
145
146 public Lock lock() {
147 return lock;
148 }
149
150 public DateTime expirationTime() {
151 return expirationTime;
152 }
153
154 public int leaseDurationMillis() {
155 return leaseDurationMillis;
156 }
157
158 public CompletableFuture<Void> future() {
159 return future;
160 }
161 }
162}