blob: f83b042e251b8c2d23c197d3b7ceeccf22df0a58 [file] [log] [blame]
Madan Jampani12390c12014-11-12 00:35:56 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.Iterator;
6import java.util.List;
7import java.util.concurrent.CompletableFuture;
8
9import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
15import org.joda.time.DateTime;
16import org.onlab.onos.cluster.ClusterService;
17import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
18import org.onlab.onos.store.cluster.messaging.ClusterMessage;
19import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
20import org.onlab.onos.store.service.DatabaseService;
21import org.onlab.onos.store.service.Lock;
22import org.onlab.onos.store.service.LockEventListener;
23import org.onlab.onos.store.service.LockService;
24import org.slf4j.Logger;
25
26import com.google.common.collect.ArrayListMultimap;
27
28@Component(immediate = true)
29@Service
30public class DistributedLockManager implements LockService {
31
32 private final Logger log = getLogger(getClass());
33
34 public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
35
36 private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
37
38 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
39 private ClusterCommunicationService clusterCommunicator;
40
41 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
42 private DatabaseService databaseService;
43
44 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
45 private ClusterService clusterService;
46
47 @Activate
48 public void activate() {
49 clusterCommunicator.addSubscriber(
50 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
51 new LockEventMessageListener());
52 log.info("Started.");
53
54 }
55
56 @Deactivate
57 public void deactivate() {
58 locksToAcquire.clear();
59 log.info("Started.");
60 }
61
62 @Override
63 public Lock create(String path) {
64 return new DistributedLock(
65 path,
66 databaseService,
67 clusterService,
68 this);
69 }
70
71 @Override
72 public void addListener(LockEventListener listener) {
73 // FIXME:
74 throw new UnsupportedOperationException();
75 }
76
77 @Override
78 public void removeListener(LockEventListener listener) {
79 // FIXME:
80 throw new UnsupportedOperationException();
81 }
82
Madan Jampani44e6a542014-11-12 01:06:51 -080083 protected CompletableFuture<Void> lockIfAvailable(
84 Lock lock,
85 long waitTimeMillis,
86 int leaseDurationMillis) {
Madan Jampani12390c12014-11-12 00:35:56 -080087 CompletableFuture<Void> future = new CompletableFuture<>();
88 locksToAcquire.put(
89 lock.path(),
90 new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
91 return future;
92 }
93
94 private class LockEventMessageListener implements ClusterMessageHandler {
95 @Override
96 public void handle(ClusterMessage message) {
97 TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
98 if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
99 return;
100 }
101
102 String path = event.key();
103 if (!locksToAcquire.containsKey(path)) {
104 return;
105 }
106
107 if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
108 List<LockRequest> existingRequests = locksToAcquire.get(path);
Madan Jampani44e6a542014-11-12 01:06:51 -0800109 if (existingRequests == null) {
110 return;
111 }
Madan Jampani12390c12014-11-12 00:35:56 -0800112
113 Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
114 while (existingRequestIterator.hasNext()) {
115 LockRequest request = existingRequestIterator.next();
116 if (request.expirationTime().isAfter(DateTime.now())) {
117 existingRequestIterator.remove();
118 } else {
Madan Jampani44e6a542014-11-12 01:06:51 -0800119 if (request.lock().tryLock(request.leaseDurationMillis())) {
Madan Jampani12390c12014-11-12 00:35:56 -0800120 request.future().complete(null);
121 existingRequests.remove(0);
122 }
123 }
124 }
125 }
126 }
127 }
128
129 private class LockRequest {
130
131 private final Lock lock;
132 private final DateTime expirationTime;
133 private final int leaseDurationMillis;
134 private final CompletableFuture<Void> future;
135
136 public LockRequest(
137 Lock lock,
138 long waitTimeMillis,
139 int leaseDurationMillis,
140 CompletableFuture<Void> future) {
141
142 this.lock = lock;
143 this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
144 this.leaseDurationMillis = leaseDurationMillis;
145 this.future = future;
146 }
147
148 public Lock lock() {
149 return lock;
150 }
151
152 public DateTime expirationTime() {
153 return expirationTime;
154 }
155
156 public int leaseDurationMillis() {
157 return leaseDurationMillis;
158 }
159
160 public CompletableFuture<Void> future() {
161 return future;
162 }
163 }
164}