blob: c8b16e41e491d7eadb7b51c03f3010c53684c587 [file] [log] [blame]
Madan Jampani12390c12014-11-12 00:35:56 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.Iterator;
6import java.util.List;
7import java.util.concurrent.CompletableFuture;
8
9import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
15import org.joda.time.DateTime;
16import org.onlab.onos.cluster.ClusterService;
17import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
18import org.onlab.onos.store.cluster.messaging.ClusterMessage;
19import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
20import org.onlab.onos.store.service.DatabaseService;
21import org.onlab.onos.store.service.Lock;
22import org.onlab.onos.store.service.LockEventListener;
23import org.onlab.onos.store.service.LockService;
24import org.slf4j.Logger;
25
26import com.google.common.collect.ArrayListMultimap;
27
28@Component(immediate = true)
29@Service
30public class DistributedLockManager implements LockService {
31
32 private final Logger log = getLogger(getClass());
33
34 public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
35
Madan Jampani9b37d572014-11-12 11:53:24 -080036 private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
37 .create();
Madan Jampani12390c12014-11-12 00:35:56 -080038
39 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
40 private ClusterCommunicationService clusterCommunicator;
41
42 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
43 private DatabaseService databaseService;
44
45 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
46 private ClusterService clusterService;
47
48 @Activate
49 public void activate() {
50 clusterCommunicator.addSubscriber(
51 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
52 new LockEventMessageListener());
53 log.info("Started.");
54
55 }
56
57 @Deactivate
58 public void deactivate() {
59 locksToAcquire.clear();
60 log.info("Started.");
61 }
62
63 @Override
64 public Lock create(String path) {
Madan Jampani9b37d572014-11-12 11:53:24 -080065 return new DistributedLock(path, databaseService, clusterService, this);
Madan Jampani12390c12014-11-12 00:35:56 -080066 }
67
68 @Override
69 public void addListener(LockEventListener listener) {
70 // FIXME:
71 throw new UnsupportedOperationException();
72 }
73
74 @Override
75 public void removeListener(LockEventListener listener) {
76 // FIXME:
77 throw new UnsupportedOperationException();
78 }
79
Madan Jampani9b37d572014-11-12 11:53:24 -080080 protected CompletableFuture<Void> lockIfAvailable(Lock lock,
81 long waitTimeMillis, int leaseDurationMillis) {
Madan Jampani12390c12014-11-12 00:35:56 -080082 CompletableFuture<Void> future = new CompletableFuture<>();
Madan Jampani9b37d572014-11-12 11:53:24 -080083 locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
84 leaseDurationMillis, future));
Madan Jampani12390c12014-11-12 00:35:56 -080085 return future;
86 }
87
88 private class LockEventMessageListener implements ClusterMessageHandler {
89 @Override
90 public void handle(ClusterMessage message) {
Madan Jampani9b37d572014-11-12 11:53:24 -080091 TableModificationEvent event = DatabaseStateMachine.SERIALIZER
92 .decode(message.payload());
Madan Jampani12390c12014-11-12 00:35:56 -080093 if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
94 return;
95 }
96
Madan Jampanif5d263b2014-11-13 10:04:40 -080097 log.info("Received a lock available event for path: {}", event.key());
98
Madan Jampani12390c12014-11-12 00:35:56 -080099 String path = event.key();
100 if (!locksToAcquire.containsKey(path)) {
101 return;
102 }
103
104 if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
105 List<LockRequest> existingRequests = locksToAcquire.get(path);
Madan Jampani44e6a542014-11-12 01:06:51 -0800106 if (existingRequests == null) {
107 return;
108 }
Madan Jampani12390c12014-11-12 00:35:56 -0800109
Madan Jampani9b37d572014-11-12 11:53:24 -0800110 synchronized (existingRequests) {
111
112 Iterator<LockRequest> existingRequestIterator = existingRequests
113 .iterator();
114 while (existingRequestIterator.hasNext()) {
115 LockRequest request = existingRequestIterator.next();
116 if (request.expirationTime().isAfter(DateTime.now())) {
117 existingRequestIterator.remove();
118 } else {
119 if (request.lock().tryLock(
120 request.leaseDurationMillis())) {
121 request.future().complete(null);
122 existingRequestIterator.remove();
123 }
Madan Jampani12390c12014-11-12 00:35:56 -0800124 }
125 }
126 }
127 }
128 }
129 }
130
131 private class LockRequest {
132
133 private final Lock lock;
134 private final DateTime expirationTime;
135 private final int leaseDurationMillis;
136 private final CompletableFuture<Void> future;
137
Madan Jampani9b37d572014-11-12 11:53:24 -0800138 public LockRequest(Lock lock, long waitTimeMillis,
139 int leaseDurationMillis, CompletableFuture<Void> future) {
Madan Jampani12390c12014-11-12 00:35:56 -0800140
141 this.lock = lock;
Madan Jampani9b37d572014-11-12 11:53:24 -0800142 this.expirationTime = DateTime.now().plusMillis(
143 (int) waitTimeMillis);
Madan Jampani12390c12014-11-12 00:35:56 -0800144 this.leaseDurationMillis = leaseDurationMillis;
145 this.future = future;
146 }
147
148 public Lock lock() {
149 return lock;
150 }
151
152 public DateTime expirationTime() {
153 return expirationTime;
154 }
155
156 public int leaseDurationMillis() {
157 return leaseDurationMillis;
158 }
159
160 public CompletableFuture<Void> future() {
161 return future;
162 }
163 }
Madan Jampanif5d263b2014-11-13 10:04:40 -0800164}