Reworked DatabaseService API.
Initial implementation of LockManager.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
new file mode 100644
index 0000000..c87ab37
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -0,0 +1,159 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.joda.time.DateTime;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.LockEventListener;
+import org.onlab.onos.store.service.LockService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ArrayListMultimap;
+
+@Component(immediate = true)
+@Service
+public class DistributedLockManager implements LockService {
+
+ private final Logger log = getLogger(getClass());
+
+ public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
+
+ private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private DatabaseService databaseService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ @Activate
+ public void activate() {
+ clusterCommunicator.addSubscriber(
+ DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
+ new LockEventMessageListener());
+ log.info("Started.");
+
+ }
+
+ @Deactivate
+ public void deactivate() {
+ locksToAcquire.clear();
+ log.info("Started.");
+ }
+
+ @Override
+ public Lock create(String path) {
+ return new DistributedLock(
+ path,
+ databaseService,
+ clusterService,
+ this);
+ }
+
+ @Override
+ public void addListener(LockEventListener listener) {
+ // FIXME:
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeListener(LockEventListener listener) {
+ // FIXME:
+ throw new UnsupportedOperationException();
+ }
+
+ protected CompletableFuture<Void> lockIfAvailable(Lock lock, long waitTimeMillis, int leaseDurationMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ locksToAcquire.put(
+ lock.path(),
+ new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
+ return future;
+ }
+
+ private class LockEventMessageListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
+ if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
+ return;
+ }
+
+ String path = event.key();
+ if (!locksToAcquire.containsKey(path)) {
+ return;
+ }
+
+ if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
+ List<LockRequest> existingRequests = locksToAcquire.get(path);
+ if (existingRequests == null) return;
+
+ Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
+ while (existingRequestIterator.hasNext()) {
+ LockRequest request = existingRequestIterator.next();
+ if (request.expirationTime().isAfter(DateTime.now())) {
+ existingRequestIterator.remove();
+ } else {
+ if (request.lock().tryLock(request.leaseDurationMillis()) == true) {
+ request.future().complete(null);
+ existingRequests.remove(0);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private class LockRequest {
+
+ private final Lock lock;
+ private final DateTime expirationTime;
+ private final int leaseDurationMillis;
+ private final CompletableFuture<Void> future;
+
+ public LockRequest(
+ Lock lock,
+ long waitTimeMillis,
+ int leaseDurationMillis,
+ CompletableFuture<Void> future) {
+
+ this.lock = lock;
+ this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
+ this.leaseDurationMillis = leaseDurationMillis;
+ this.future = future;
+ }
+
+ public Lock lock() {
+ return lock;
+ }
+
+ public DateTime expirationTime() {
+ return expirationTime;
+ }
+
+ public int leaseDurationMillis() {
+ return leaseDurationMillis;
+ }
+
+ public CompletableFuture<Void> future() {
+ return future;
+ }
+ }
+}
\ No newline at end of file