blob: c87ab37bfc0812d25e85aab2fa8ff1307d68d0e1 [file] [log] [blame]
Madan Jampani12390c12014-11-12 00:35:56 -08001package org.onlab.onos.store.service.impl;
2
3import static org.slf4j.LoggerFactory.getLogger;
4
5import java.util.Iterator;
6import java.util.List;
7import java.util.concurrent.CompletableFuture;
8
9import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
12import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
14import org.apache.felix.scr.annotations.Service;
15import org.joda.time.DateTime;
16import org.onlab.onos.cluster.ClusterService;
17import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
18import org.onlab.onos.store.cluster.messaging.ClusterMessage;
19import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
20import org.onlab.onos.store.service.DatabaseService;
21import org.onlab.onos.store.service.Lock;
22import org.onlab.onos.store.service.LockEventListener;
23import org.onlab.onos.store.service.LockService;
24import org.slf4j.Logger;
25
26import com.google.common.collect.ArrayListMultimap;
27
28@Component(immediate = true)
29@Service
30public class DistributedLockManager implements LockService {
31
32 private final Logger log = getLogger(getClass());
33
34 public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
35
36 private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
37
38 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
39 private ClusterCommunicationService clusterCommunicator;
40
41 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
42 private DatabaseService databaseService;
43
44 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
45 private ClusterService clusterService;
46
47 @Activate
48 public void activate() {
49 clusterCommunicator.addSubscriber(
50 DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
51 new LockEventMessageListener());
52 log.info("Started.");
53
54 }
55
56 @Deactivate
57 public void deactivate() {
58 locksToAcquire.clear();
59 log.info("Started.");
60 }
61
62 @Override
63 public Lock create(String path) {
64 return new DistributedLock(
65 path,
66 databaseService,
67 clusterService,
68 this);
69 }
70
71 @Override
72 public void addListener(LockEventListener listener) {
73 // FIXME:
74 throw new UnsupportedOperationException();
75 }
76
77 @Override
78 public void removeListener(LockEventListener listener) {
79 // FIXME:
80 throw new UnsupportedOperationException();
81 }
82
83 protected CompletableFuture<Void> lockIfAvailable(Lock lock, long waitTimeMillis, int leaseDurationMillis) {
84 CompletableFuture<Void> future = new CompletableFuture<>();
85 locksToAcquire.put(
86 lock.path(),
87 new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
88 return future;
89 }
90
91 private class LockEventMessageListener implements ClusterMessageHandler {
92 @Override
93 public void handle(ClusterMessage message) {
94 TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
95 if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
96 return;
97 }
98
99 String path = event.key();
100 if (!locksToAcquire.containsKey(path)) {
101 return;
102 }
103
104 if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
105 List<LockRequest> existingRequests = locksToAcquire.get(path);
106 if (existingRequests == null) return;
107
108 Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
109 while (existingRequestIterator.hasNext()) {
110 LockRequest request = existingRequestIterator.next();
111 if (request.expirationTime().isAfter(DateTime.now())) {
112 existingRequestIterator.remove();
113 } else {
114 if (request.lock().tryLock(request.leaseDurationMillis()) == true) {
115 request.future().complete(null);
116 existingRequests.remove(0);
117 }
118 }
119 }
120 }
121 }
122 }
123
124 private class LockRequest {
125
126 private final Lock lock;
127 private final DateTime expirationTime;
128 private final int leaseDurationMillis;
129 private final CompletableFuture<Void> future;
130
131 public LockRequest(
132 Lock lock,
133 long waitTimeMillis,
134 int leaseDurationMillis,
135 CompletableFuture<Void> future) {
136
137 this.lock = lock;
138 this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
139 this.leaseDurationMillis = leaseDurationMillis;
140 this.future = future;
141 }
142
143 public Lock lock() {
144 return lock;
145 }
146
147 public DateTime expirationTime() {
148 return expirationTime;
149 }
150
151 public int leaseDurationMillis() {
152 return leaseDurationMillis;
153 }
154
155 public CompletableFuture<Void> future() {
156 return future;
157 }
158 }
159}