Reworked DatabaseService API.
Initial implementation of LockManager.
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
index 8beed85..db8514d 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
@@ -15,8 +15,13 @@
*/
package org.onlab.onos.foo;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -37,20 +42,10 @@
import org.onlab.onos.net.intent.IntentListener;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.store.service.DatabaseAdminService;
-import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
-import org.onlab.onos.store.service.OptionalResult;
-import org.onlab.onos.store.service.PreconditionFailedException;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.WriteRequest;
-import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.VersionedValue;
import org.slf4j.Logger;
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static java.util.concurrent.Executors.newScheduledThreadPool;
-
/**
* Playground app component.
*/
@@ -97,9 +92,9 @@
log.info("Couldn't find DB service");
} else {
log.info("Found DB service");
-// longIncrementor();
-// executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
-// executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
+ longIncrementor();
+ executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
+ executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
}
log.info("Started");
}
@@ -164,44 +159,33 @@
dbAdminService.createTable(someTable);
- ReadResult read = dbService.read(ReadRequest.get(someTable, someKey));
- if (!read.valueExists()) {
+ VersionedValue vv = dbService.get(someTable, someKey);
+ if (vv == null) {
ByteBuffer zero = ByteBuffer.allocate(Long.BYTES).putLong(0);
- try {
- dbService.write(WriteRequest
- .putIfAbsent(someTable,
- someKey,
- zero.array()));
- log.info("Wrote initial value");
- read = dbService.read(ReadRequest.get(someTable, someKey));
- } catch (PreconditionFailedException e) {
- log.info("Concurrent write detected.", e);
-
- // concurrent write detected, read and fall through
- read = dbService.read(ReadRequest.get(someTable, someKey));
- if (!read.valueExists()) {
- log.error("Shouldn't reach here");
- }
+ if (dbService.putIfAbsent(someTable, someKey, zero.array())) {
+ log.info("Wrote initial value");
+ vv = dbService.get(someTable, someKey);
+ } else {
+ log.info("Concurrent write detected.");
+ // concurrent write detected, read and fall through
+ vv = dbService.get(someTable, someKey);
+ if (vv == null) {
+ log.error("Shouldn't reach here");
+ }
}
}
int retry = 5;
do {
- ByteBuffer prev = ByteBuffer.wrap(read.value().value());
+ ByteBuffer prev = ByteBuffer.wrap(vv.value());
long next = prev.getLong() + 1;
byte[] newValue = ByteBuffer.allocate(Long.BYTES).putLong(next).array();
- OptionalResult<WriteResult, DatabaseException> result
- = dbService.writeNothrow(WriteRequest
- .putIfVersionMatches(someTable,
- someKey,
- newValue,
- read.value().version()));
- if (result.hasValidResult()) {
- log.info("Write success {} -> {}", result.get().previousValue(), next);
+ if (dbService.putIfVersionMatches(someTable, someKey, newValue, vv.version())) {
+ log.info("Write success. New value: {}", next);
break;
} else {
log.info("Write failed trying to write {}", next);
- read = dbService.read(ReadRequest.get(someTable, someKey));
- if (!read.valueExists()) {
+ vv = dbService.get(someTable, someKey);
+ if (vv == null) {
log.error("Shouldn't reach here");
}
}
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java
new file mode 100644
index 0000000..1cf422c
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java
@@ -0,0 +1,70 @@
+package org.onlab.onos.store.service;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Collection of read requests to be submitted as one batch.
+ */
+public class BatchReadRequest {
+
+ private final List<ReadRequest> readRequests;
+
+ /**
+ * Creates a new BatchReadRequest object from the specified list of read requests.
+ * @param readRequests read requests.
+ * @return BatchReadRequest object.
+ */
+ public static BatchReadRequest create(List<ReadRequest> readRequests) {
+ return new BatchReadRequest(readRequests);
+ }
+
+ private BatchReadRequest(List<ReadRequest> readRequests) {
+ this.readRequests = Collections.unmodifiableList(readRequests);
+ }
+
+ /**
+ * Returns the number of requests in this batch.
+ * @return size of request batch.
+ */
+ public int batchSize() {
+ return readRequests.size();
+ }
+
+ /**
+ * Returns the requests in this batch as a list.
+ * @return list of read requests
+ */
+ public List<ReadRequest> getAsList() {
+ return readRequests;
+ }
+
+ /**
+ * Builder for BatchReadRequest.
+ */
+ public static class Builder {
+
+ private final List<ReadRequest> readRequests = Lists.newLinkedList();
+
+ /**
+ * Append a get request.
+ * @param tableName table name
+ * @param key key to fetch.
+ * @return this Builder
+ */
+ public Builder get(String tableName, String key) {
+ readRequests.add(new ReadRequest(tableName, key));
+ return this;
+ }
+
+ /**
+ * Builds a BatchReadRequest
+ * @return BatchReadRequest
+ */
+ public BatchReadRequest build() {
+ return new BatchReadRequest(readRequests);
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java
new file mode 100644
index 0000000..7281eff
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java
@@ -0,0 +1,21 @@
+package org.onlab.onos.store.service;
+
+import java.util.Collections;
+import java.util.List;
+
+public class BatchReadResult {
+
+ private final List<ReadResult> readResults;
+
+ public BatchReadResult(List<ReadResult> readResults) {
+ this.readResults = Collections.unmodifiableList(readResults);
+ }
+
+ public List<ReadResult> getAsList() {
+ return readResults;
+ }
+
+ public int batchSize() {
+ return readResults.size();
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
new file mode 100644
index 0000000..9ce14a1
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
@@ -0,0 +1,90 @@
+package org.onlab.onos.store.service;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Collection of write requests to be submitted as one batch.
+ */
+public class BatchWriteRequest {
+
+ private final List<WriteRequest> writeRequests;
+
+ /**
+ * Creates a new BatchWriteRequest object from the specified list of write requests.
+ * @param writeRequests write requests.
+ * @return BatchWriteRequest object.
+ */
+ public static BatchWriteRequest create(List<WriteRequest> writeRequests) {
+ return new BatchWriteRequest(writeRequests);
+ }
+
+ private BatchWriteRequest(List<WriteRequest> writeRequests) {
+ this.writeRequests = Collections.unmodifiableList(writeRequests);
+ }
+
+ /**
+ * Returns the requests in this batch as a list.
+ * @return list of write requests
+ */
+ public List<WriteRequest> getAsList() {
+ return writeRequests;
+ }
+
+ /**
+ * Returns the number of requests in this batch.
+ * @return size of request batch.
+ */
+ public int batchSize() {
+ return writeRequests.size();
+ }
+
+ /**
+ * Builder for BatchWriteRequest.
+ */
+ public static class Builder {
+
+ private final List<WriteRequest> writeRequests = Lists.newLinkedList();
+
+ public Builder put(String tableName, String key, byte[] value) {
+ writeRequests.add(WriteRequest.put(tableName, key, value));
+ return this;
+ }
+
+ public Builder putIfAbsent(String tableName, String key, byte[] value) {
+ writeRequests.add(WriteRequest.putIfAbsent(tableName, key, value));
+ return this;
+ }
+
+ public Builder putIfValueMatches(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ writeRequests.add(WriteRequest.putIfValueMatches(tableName, key, oldValue, newValue));
+ return this;
+ }
+
+ public Builder putIfVersionMatches(String tableName, String key, byte[] value, long version) {
+ writeRequests.add(WriteRequest.putIfVersionMatches(tableName, key, value, version));
+ return this;
+ }
+
+ public Builder remove(String tableName, String key) {
+ writeRequests.add(WriteRequest.remove(tableName, key));
+ return this;
+ }
+
+ public Builder removeIfVersionMatches(String tableName, String key, long version) {
+ writeRequests.add(WriteRequest.removeIfVersionMatches(tableName, key, version));
+ return this;
+ }
+
+ public Builder removeIfValueMatches(String tableName, String key, byte[] value) {
+ writeRequests.add(WriteRequest.removeIfValueMatches(tableName, key, value));
+ return this;
+ }
+
+ public BatchWriteRequest build() {
+ return new BatchWriteRequest(writeRequests);
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java
new file mode 100644
index 0000000..22c653c
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java
@@ -0,0 +1,30 @@
+package org.onlab.onos.store.service;
+
+import java.util.Collections;
+import java.util.List;
+
+public class BatchWriteResult {
+
+ private final List<WriteResult> writeResults;
+
+ public BatchWriteResult(List<WriteResult> writeResults) {
+ this.writeResults = Collections.unmodifiableList(writeResults);
+ }
+
+ public boolean isSuccessful() {
+ for (WriteResult result : writeResults) {
+ if (result.status() != WriteStatus.OK) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public List<WriteResult> getAsList() {
+ return this.writeResults;
+ }
+
+ public int batchSize() {
+ return writeResults.size();
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java
index cf0ef0a..57d81f2 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java
@@ -1,7 +1,5 @@
package org.onlab.onos.store.service;
-import java.util.List;
-
/**
* Service interface for a strongly consistent and durable
* key value data store.
@@ -9,46 +7,93 @@
public interface DatabaseService {
/**
- * Performs a read on the database.
- * @param request read request.
- * @return ReadResult
- * @throws DatabaseException if there is a failure in executing read.
+ * Reads the specified key.
+ * @param tableName name of the table associated with this operation.
+ * @return key key to read.
+ * @returns value (and version) associated with this key. This calls returns null if the key does not exist.
*/
- ReadResult read(ReadRequest request);
-
+ VersionedValue get(String tableName, String key);
+
/**
- * Performs a batch read operation on the database.
- * The main advantage of batch read operation is parallelization.
- * @param batch batch of read requests to execute.
- * @return batch read result.
+ * Associate the key with a value.
+ * @param tableName table name in which this key/value resides.
+ * @param key key with which the specified value is to be associated
+ * @param value value to be associated with the specified key
+ * @return the previous value associated with the specified key, or null if there was no mapping for the key.
*/
- List<OptionalResult<ReadResult, DatabaseException>> batchRead(List<ReadRequest> batch);
-
- // FIXME Give me a better name
+ VersionedValue put(String tableName, String key, byte[] value);
+
/**
- * Performs a write operation on the database.
- * @param request write request
- * @return write result.
- * @throws DatabaseException if there is failure in execution write.
+ * If the specified key is not already associated with a value, associate it with the given value.
+ * @param tableName table name in which this key/value resides.
+ * @param key key with which the specified value is to be associated
+ * @param value value to be associated with the specified key
+ * @return true if put was successful, false if there is already a value associated with this key
*/
- OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request);
-
+ boolean putIfAbsent(String tableName, String key, byte[] value);
+
/**
- * Performs a write operation on the database.
- * @param request write request
- * @return write result.
- * @throws OptimisticLockException FIXME define conditional failure
- * @throws PreconditionFailedException FIXME define conditional failure
- * @throws DatabaseException if there is failure in execution write.
+ * Sets the key to the specified value if the version in the database (for that key)
+ * matches the specified version.
+ * @param tableName name of table associated with this operation.
+ * @param key key
+ * @param value value
+ * @param version version that should present in the database for the put to be successful.
+ * @return true if put was successful, false if there version in database is different from what is specified.
*/
- WriteResult write(WriteRequest request)/* throws OptimisticLockException, PreconditionFailedException*/;
-
+ boolean putIfVersionMatches(String tableName, String key, byte[] value, long version);
+
/**
- * Performs a batch write operation on the database.
- * Batch write provides transactional semantics. Either all operations
- * succeed or none of them do.
- * @param batch batch of write requests to execute as a transaction.
- * @return result of executing the batch write operation.
+ * Replaces the entry for a key only if currently mapped to a given value.
+ * @param tableName name of table associated with this operation.
+ * @param key with which the specified value is associated
+ * @param oldValue value expected to be associated with the specified key
+ * @param newValue value to be associated with the specified key
+ * @return true if put was successful, false if there version in database is different from what is specified.
*/
- List<OptionalResult<WriteResult, DatabaseException>> batchWrite(List<WriteRequest> batch);
-}
+ boolean putIfValueMatches(String tableName, String key, byte[] oldValue, byte[] newValue);
+
+ /**
+ * Removes the key (and associated value).
+ * @param tableName name of table associated with this operation.
+ * @param key key to remove
+ * @return value previously associated with the key. This call returns null if the key does not exist.
+ */
+ VersionedValue remove(String tableName, String key);
+
+ /**
+ * Removes the key (and associated value) if the version in the database matches specified version.
+ * @param tableName name of table associated with this operation.
+ * @param key key to remove
+ * @param version version that should present in the database for the remove to be successful.
+ * @return true if remove was successful, false if there version in database is different from what is specified.
+ */
+ boolean removeIfVersionMatches(String tableName, String key, long version);
+
+ /**
+ * Removes the key (and associated value) if the value in the database matches specified value.
+ * @param tableName name of table associated with this operation.
+ * @param key key to remove
+ * @param value value that should present in the database for the remove to be successful.
+ * @return true if remove was successful, false if there value in database is different from what is specified.
+ */
+ boolean removeIfValueMatches(String tableName, String key, byte[] value);
+
+ /**
+ * Performs a batch read operation and returns the results.
+ * @param batchRequest batch request.
+ * @return result of the batch operation.
+ */
+ BatchReadResult batchRead(BatchReadRequest batchRequest);
+
+ /**
+ * Performs a batch write operation and returns the results.
+ * This method provides transactional semantics. Either all writes succeed or none do.
+ * Even a single write failure would cause the entire batch to be aborted.
+ * In the case of unsuccessful operation, the batch result can be inspected to determine
+ * which operation(s) caused the batch to fail.
+ * @param batchRequest batch request.
+ * @return result of the batch operation.
+ */
+ BatchWriteResult batchWrite(BatchWriteRequest batchRequest);
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
index 6d68133..510df82 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
@@ -9,6 +9,12 @@
*/
public interface Lock {
+ /**
+ * Returns the path this lock will be used to guard from concurrent access.
+ * @return path.
+ */
+ String path();
+
/**
* Acquires the lock.
* If the lock is not available then the caller thread becomes
@@ -26,7 +32,7 @@
* already been released by invoking unlock(). Must be in the range
* (0, LockManager.MAX_LEASE_MILLIS]
*/
- void lock(long leaseDurationMillis);
+ void lock(int leaseDurationMillis);
/**
* Acquires the lock only if it is free at the time of invocation.
@@ -36,7 +42,7 @@
* (0, LockManager.MAX_LEASE_MILLIS]
* @return true if the lock was acquired and false otherwise
*/
- boolean tryLock(long leaseDurationMillis);
+ boolean tryLock(int leaseDurationMillis);
/**
* Acquires the lock if it is free within the given waiting
@@ -49,7 +55,7 @@
* @return true if the lock was acquired and false if the waiting time
* elapsed before the lock was acquired
*/
- boolean tryLock(long waitTimeMillis, long leaseDurationMillis);
+ boolean tryLock(long waitTimeMillis, int leaseDurationMillis);
/**
* Returns true if this Lock instance currently holds the lock.
@@ -72,5 +78,5 @@
* @return true if successfully extended expiration, false if attempt to
* extend expiration fails or if the path is currently not locked by this instance.
*/
- boolean extendExpiration(long leaseDurationMillis);
+ boolean extendExpiration(int leaseDurationMillis);
}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/ReadRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/ReadRequest.java
index 9f7af4a..f30529e 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/ReadRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/ReadRequest.java
@@ -75,5 +75,4 @@
return Objects.equals(this.key, other.key) &&
Objects.equals(this.tableName, other.tableName);
}
-
}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java b/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
index 49e9665..3f253f2 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
@@ -11,12 +11,21 @@
private final String tableName;
private final String key;
private final VersionedValue value;
+ private final ReadStatus status;
- public ReadResult(String tableName, String key, VersionedValue value) {
+ public ReadResult(ReadStatus status, String tableName, String key, VersionedValue value) {
+ this.status = status;
this.tableName = tableName;
this.key = key;
this.value = value;
}
+
+ /**
+ * Returns the status of the read operation.
+ */
+ public ReadStatus status() {
+ return status;
+ }
/**
* Returns database table name.
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/ReadStatus.java b/core/api/src/main/java/org/onlab/onos/store/service/ReadStatus.java
new file mode 100644
index 0000000..2039a1c
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/ReadStatus.java
@@ -0,0 +1,6 @@
+package org.onlab.onos.store.service;
+
+public enum ReadStatus {
+ OK,
+ NO_SUCH_TABLE
+}
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
index fb736e5..6607cfe 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
@@ -112,7 +112,7 @@
* @param previousVersion previous version expected
* @return WriteRequest
*/
- public static WriteRequest remove(String tableName, String key,
+ public static WriteRequest removeIfVersionMatches(String tableName, String key,
long previousVersion) {
return new WriteRequest(REMOVE_IF_VALUE, tableName, key,
null, previousVersion, null);
@@ -127,7 +127,7 @@
* @param oldValue previous value expected, must not be null
* @return WriteRequest
*/
- public static WriteRequest remove(String tableName, String key,
+ public static WriteRequest removeIfValueMatches(String tableName, String key,
byte[] oldValue) {
return new WriteRequest(Type.REMOVE_IF_VALUE, tableName, key,
null, ANY_VERSION, checkNotNull(oldValue));
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/WriteResult.java b/core/api/src/main/java/org/onlab/onos/store/service/WriteResult.java
index aec3046..3cc11b0 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/WriteResult.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/WriteResult.java
@@ -7,34 +7,27 @@
* Database write result.
*/
public class WriteResult {
-
- private final String tableName;
- private final String key;
+
+ private final WriteStatus status;
private final VersionedValue previousValue;
-
- public WriteResult(String tableName, String key, VersionedValue previousValue) {
- this.tableName = tableName;
- this.key = key;
+
+ public WriteResult(WriteStatus status, VersionedValue previousValue) {
+ this.status = status;
this.previousValue = previousValue;
}
- public String tableName() {
- return tableName;
- }
-
- public String key() {
- return key;
- }
-
public VersionedValue previousValue() {
return previousValue;
}
+
+ public WriteStatus status() {
+ return status;
+ }
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("tableName", tableName)
- .add("key", key)
+ .add("status", status)
.add("previousValue", previousValue)
.toString();
}
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/WriteStatus.java b/core/api/src/main/java/org/onlab/onos/store/service/WriteStatus.java
new file mode 100644
index 0000000..ebb4f6d
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/WriteStatus.java
@@ -0,0 +1,8 @@
+package org.onlab.onos.store.service;
+
+public enum WriteStatus {
+ OK,
+ ABORTED,
+ PRECONDITION_VIOLATION,
+ NO_SUCH_TABLE,
+}
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index f0f37c7..10196c2 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -103,6 +103,12 @@
<artifactId>hazelcast</artifactId>
</dependency>
+ <dependency>
+ <groupId>net.jodah</groupId>
+ <artifactId>expiringmap</artifactId>
+ <version>0.3.1</version>
+ </dependency>
+
<!-- for shaded copycat -->
<dependency>
<groupId>org.onlab.onos</groupId>
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index c749197..77ff062 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -8,9 +8,11 @@
import net.kuujo.copycat.Copycat;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.DatabaseException;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteResult;
/**
* Client for interacting with the Copycat Raft cluster.
@@ -63,9 +65,9 @@
}
}
- public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
+ public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
- CompletableFuture<List<InternalReadResult>> future = copycat.submit("read", requests);
+ CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
@@ -73,9 +75,9 @@
}
}
- public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
+ public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
- CompletableFuture<List<InternalWriteResult>> future = copycat.submit("write", requests);
+ CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index afeaef9..e3de134 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -4,8 +4,6 @@
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -22,7 +20,6 @@
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.log.InMemoryLog;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
@@ -37,17 +34,18 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchReadResult;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.BatchWriteResult;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
-import org.onlab.onos.store.service.NoSuchTableException;
-import org.onlab.onos.store.service.OptimisticLockException;
-import org.onlab.onos.store.service.OptionalResult;
-import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.WriteAborted;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadStatus;
+import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.WriteStatus;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
@@ -199,66 +197,121 @@
}
@Override
- public ReadResult read(ReadRequest request) {
- return batchRead(Arrays.asList(request)).get(0).get();
- }
-
- @Override
- public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
- List<ReadRequest> batch) {
- List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
- for (InternalReadResult internalReadResult : client.batchRead(batch)) {
- if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
- readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
- new NoSuchTableException()));
- } else {
- readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
- internalReadResult.result()));
- }
+ public VersionedValue get(String tableName, String key) {
+ BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
+ ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
+ if (readResult.status().equals(ReadStatus.OK)) {
+ return readResult.value();
}
- return readResults;
+ throw new DatabaseException("get failed due to status: " + readResult.status());
}
@Override
- public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
- return batchWrite(Arrays.asList(request)).get(0);
+ public BatchReadResult batchRead(BatchReadRequest batchRequest) {
+ return new BatchReadResult(client.batchRead(batchRequest));
}
@Override
- public WriteResult write(WriteRequest request) {
-// throws OptimisticLockException, PreconditionFailedException {
- return writeNothrow(request).get();
+ public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
+ return new BatchWriteResult(client.batchWrite(batchRequest));
}
@Override
- public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
- List<WriteRequest> batch) {
- List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
- for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
- if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new NoSuchTableException()));
- } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new OptimisticLockException()));
- } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
- // TODO: throw a different exception?
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new OptimisticLockException()));
- } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new WriteAborted()));
- } else {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- internalWriteResult.result()));
- }
+ public VersionedValue put(String tableName, String key, byte[] value) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return writeResult.previousValue();
}
- return writeResults;
+ throw new DatabaseException("put failed due to status: " + writeResult.status());
+ }
+ @Override
+ public boolean putIfAbsent(String tableName, String key, byte[] value) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfAbsent(tableName, key, value).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("putIfAbsent failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean putIfVersionMatches(String tableName, String key,
+ byte[] value, long version) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfVersionMatches(tableName, key, value, version).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("putIfVersionMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean putIfValueMatches(String tableName, String key,
+ byte[] oldValue, byte[] newValue) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfValueMatches(tableName, key, oldValue, newValue).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("putIfValueMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public VersionedValue remove(String tableName, String key) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().remove(tableName, key).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return writeResult.previousValue();
+ }
+ throw new DatabaseException("remove failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean removeIfVersionMatches(String tableName, String key,
+ long version) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfVersionMatches(tableName, key, version).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("removeIfVersionMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean removeIfValueMatches(String tableName, String key,
+ byte[] value) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfValueMatches(tableName, key, value).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("removeIfValueMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public void addMember(final ControllerNode node) {
+ final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+ node.tcpPort());
+ log.info("{} was added to the cluster", tcpMember);
+ synchronized (clusterConfig) {
+ clusterConfig.addRemoteMember(tcpMember);
+ }
}
private final class InternalClusterEventListener
- implements ClusterEventListener {
+ implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
@@ -266,7 +319,7 @@
final ControllerNode node = event.subject();
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
- node.tcpPort());
+ node.tcpPort());
switch (event.type()) {
case INSTANCE_ACTIVATED:
@@ -284,8 +337,8 @@
case INSTANCE_REMOVED:
if (autoAddMember) {
Set<DefaultControllerNode> members
- = tabletMembers.getOrDefault(DEFAULT_TABLET,
- Collections.emptySet());
+ = tabletMembers.getOrDefault(DEFAULT_TABLET,
+ Collections.emptySet());
// remove only if not the initial members
if (!members.contains(node)) {
synchronized (clusterConfig) {
@@ -308,63 +361,6 @@
}
- public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
- public KryoRegisteredInMemoryLog() {
- super();
- // required to deserialize object across bundles
- super.kryo.register(TcpMember.class, new TcpMemberSerializer());
- super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
- }
- }
-
- private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
-
- private final R result;
- private final DatabaseException exception;
-
- public DatabaseOperationResult(R result) {
- this.result = result;
- this.exception = null;
- }
-
- public DatabaseOperationResult(DatabaseException exception) {
- this.result = null;
- this.exception = exception;
- }
-
- @Override
- public R get() {
- if (result != null) {
- return result;
- }
- throw exception;
- }
-
- @Override
- public boolean hasValidResult() {
- return result != null;
- }
-
- @Override
- public String toString() {
- if (result != null) {
- return result.toString();
- } else {
- return exception.toString();
- }
- }
- }
-
- @Override
- public void addMember(final ControllerNode node) {
- final TcpMember tcpMember = new TcpMember(node.ip().toString(),
- node.tcpPort());
- log.info("{} was added to the cluster", tcpMember);
- synchronized (clusterConfig) {
- clusterConfig.addRemoteMember(tcpMember);
- }
- }
-
@Override
public void removeMember(final ControllerNode node) {
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 50d1203..17b174c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -8,6 +8,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@@ -15,16 +16,21 @@
import net.kuujo.copycat.Query;
import net.kuujo.copycat.StateMachine;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.ReadStatus;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
-import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
+import org.onlab.onos.store.service.WriteStatus;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import com.beust.jcommander.internal.Lists;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
@@ -40,6 +46,10 @@
private final Logger log = getLogger(getClass());
+ // message subject for database update notifications.
+ public static MessageSubject DATABASE_UPDATE_EVENTS =
+ new MessageSubject("database-update-events");
+
// serializer used for snapshot
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -47,29 +57,72 @@
serializerPool = KryoNamespace.newBuilder()
.register(VersionedValue.class)
.register(State.class)
+ .register(BatchReadRequest.class)
+ .register(BatchWriteRequest.class)
+ .register(ReadStatus.class)
+ .register(WriteStatus.class)
+ // TODO: Move this out ?
+ .register(TableModificationEvent.class)
.register(ClusterMessagingProtocol.COMMON)
.build()
.populate(1);
}
};
+ private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
+
+ // durable internal state of the database.
private State state = new State();
private boolean compressSnapshot = false;
@Command
public boolean createTable(String tableName) {
- return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
+ Map<String, VersionedValue> existingTable =
+ state.getTables().putIfAbsent(tableName, Maps.newHashMap());
+ if (existingTable == null) {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableCreated(tableName, Integer.MAX_VALUE);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Command
+ public boolean createTable(String tableName, int expirationTimeMillis) {
+ Map<String, VersionedValue> existingTable =
+ state.getTables().putIfAbsent(tableName, Maps.newHashMap());
+ if (existingTable == null) {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableCreated(tableName, expirationTimeMillis);
+ }
+ return true;
+ }
+ return false;
}
@Command
public boolean dropTable(String tableName) {
- return state.getTables().remove(tableName) != null;
+ Map<String, VersionedValue> table = state.getTables().remove(tableName);
+ if (table != null) {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableDeleted(tableName);
+ }
+ return true;
+ }
+ return false;
}
@Command
public boolean dropAllTables() {
+ Set<String> tableNames = state.getTables().keySet();
state.getTables().clear();
+ for (DatabaseUpdateEventListener listener : listeners) {
+ for (String tableName : tableNames) {
+ listener.tableDeleted(tableName);
+ }
+ }
return true;
}
@@ -79,96 +132,95 @@
}
@Query
- public List<InternalReadResult> read(List<ReadRequest> requests) {
- List<InternalReadResult> results = new ArrayList<>(requests.size());
- for (ReadRequest request : requests) {
+ public List<ReadResult> read(BatchReadRequest batchRequest) {
+ List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
+ for (ReadRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
if (table == null) {
- results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
+ results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
continue;
}
VersionedValue value = VersionedValue.copy(table.get(request.key()));
- results.add(new InternalReadResult(
- InternalReadResult.Status.OK,
- new ReadResult(
- request.tableName(),
- request.key(),
- value)));
+ results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
}
return results;
}
- InternalWriteResult.Status checkIfApplicable(WriteRequest request,
- VersionedValue value) {
+ WriteStatus checkIfApplicable(WriteRequest request,
+ VersionedValue value) {
switch (request.type()) {
case PUT:
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
case PUT_IF_ABSENT:
if (value == null) {
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
}
- return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+ return WriteStatus.PRECONDITION_VIOLATION;
case PUT_IF_VALUE:
case REMOVE_IF_VALUE:
if (value != null && Arrays.equals(value.value(), request.oldValue())) {
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
}
- return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+ return WriteStatus.PRECONDITION_VIOLATION;
case PUT_IF_VERSION:
case REMOVE_IF_VERSION:
if (value != null && request.previousVersion() == value.version()) {
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
}
- return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
+ return WriteStatus.PRECONDITION_VIOLATION;
case REMOVE:
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
default:
break;
}
log.error("Should never reach here {}", request);
- return InternalWriteResult.Status.ABORTED;
+ return WriteStatus.ABORTED;
}
@Command
- public List<InternalWriteResult> write(List<WriteRequest> requests) {
+ public List<WriteResult> write(BatchWriteRequest batchRequest) {
// applicability check
boolean abort = false;
- List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
- for (WriteRequest request : requests) {
+ List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
+ for (WriteRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
if (table == null) {
- validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
+ validationResults.add(WriteStatus.NO_SUCH_TABLE);
abort = true;
continue;
}
final VersionedValue value = table.get(request.key());
- Status result = checkIfApplicable(request, value);
+ WriteStatus result = checkIfApplicable(request, value);
validationResults.add(result);
- if (result != Status.OK) {
+ if (result != WriteStatus.OK) {
abort = true;
}
}
- List<InternalWriteResult> results = new ArrayList<>(requests.size());
+ List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
if (abort) {
- for (InternalWriteResult.Status validationResult : validationResults) {
- if (validationResult == InternalWriteResult.Status.OK) {
+ for (WriteStatus validationResult : validationResults) {
+ if (validationResult == WriteStatus.OK) {
// aborted due to applicability check failure on other request
- results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
+ results.add(new WriteResult(WriteStatus.ABORTED, null));
} else {
- results.add(new InternalWriteResult(validationResult, null));
+ results.add(new WriteResult(validationResult, null));
}
}
return results;
}
+ List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
+
// apply changes
- for (WriteRequest request : requests) {
+ for (WriteRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+
+ TableModificationEvent tableModificationEvent = null;
// FIXME: If this method could be called by multiple thread,
// synchronization scope is wrong.
// Whole function including applicability check needs to be protected.
@@ -182,16 +234,23 @@
case PUT_IF_VERSION:
VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
VersionedValue previousValue = table.put(request.key(), newValue);
- WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
- results.add(InternalWriteResult.ok(putResult));
+ WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
+ results.add(putResult);
+ tableModificationEvent = (previousValue == null) ?
+ TableModificationEvent.rowAdded(request.tableName(), request.key()) :
+ TableModificationEvent.rowUpdated(request.tableName(), request.key());
break;
case REMOVE:
case REMOVE_IF_VALUE:
case REMOVE_IF_VERSION:
VersionedValue removedValue = table.remove(request.key());
- WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
- results.add(InternalWriteResult.ok(removeResult));
+ WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
+ results.add(removeResult);
+ if (removedValue != null) {
+ tableModificationEvent =
+ TableModificationEvent.rowDeleted(request.tableName(), request.key());
+ }
break;
default:
@@ -199,7 +258,19 @@
break;
}
}
+
+ if (tableModificationEvent != null) {
+ tableModificationEvents.add(tableModificationEvent);
+ }
}
+
+ // notify listeners of table mod events.
+ for (DatabaseUpdateEventListener listener : listeners) {
+ for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
+ listener.tableModified(tableModificationEvent);
+ }
+ }
+
return results;
}
@@ -253,4 +324,8 @@
throw new SnapshotException(e);
}
}
+
+ public void addEventListener(DatabaseUpdateEventListener listener) {
+ listeners.add(listener);
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java
new file mode 100644
index 0000000..58d5446
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onlab.onos.store.service.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import net.jodah.expiringmap.ExpiringMap;
+import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
+import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
+import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.event.EventHandler;
+import net.kuujo.copycat.event.LeaderElectEvent;
+
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.service.DatabaseService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DatabaseUpdateEventHandler implements DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public final static MessageSubject DATABASE_UPDATES = new MessageSubject("database-update-event");
+
+ private DatabaseService databaseService;
+ private ClusterService cluster;
+ private ClusterCommunicationService clusterCommunicator;
+
+ private final Member localMember;
+ private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
+ private final Map<String, Map<DatabaseRow, Void>> tableEntryExpirationMap = new HashMap<>();
+ private final ExpirationListener<DatabaseRow, Void> expirationObserver = new ExpirationObserver();
+
+ DatabaseUpdateEventHandler(Member localMember) {
+ this.localMember = localMember;
+ }
+
+ @Override
+ public void tableModified(TableModificationEvent event) {
+ DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
+ Map<DatabaseRow, Void> map = tableEntryExpirationMap.get(event.tableName());
+
+ switch (event.type()) {
+ case ROW_DELETED:
+ if (isLocalMemberLeader.get()) {
+ try {
+ clusterCommunicator.broadcast(
+ new ClusterMessage(
+ cluster.getLocalNode().id(),
+ DATABASE_UPDATES,
+ DatabaseStateMachine.SERIALIZER.encode(event)));
+ } catch (IOException e) {
+ log.error("Failed to broadcast a database table modification event.", e);
+ }
+ }
+ break;
+ case ROW_ADDED:
+ case ROW_UPDATED:
+ map.put(row, null);
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void tableCreated(String tableName, int expirationTimeMillis) {
+ // make this explicit instead of relying on a negative value
+ // to indicate no expiration.
+ if (expirationTimeMillis > 0) {
+ tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
+ .expiration(expirationTimeMillis, TimeUnit.SECONDS)
+ .expirationListener(expirationObserver)
+ // FIXME: make the expiration policy configurable.
+ .expirationPolicy(ExpirationPolicy.CREATED)
+ .build());
+ }
+ }
+
+ @Override
+ public void tableDeleted(String tableName) {
+ tableEntryExpirationMap.remove(tableName);
+ }
+
+ private class ExpirationObserver implements ExpirationListener<DatabaseRow, Void> {
+ @Override
+ public void expired(DatabaseRow key, Void value) {
+ try {
+ // TODO: The safety of this check needs to be verified.
+ // Couple of issues:
+ // 1. It is very likely that only one member should attempt deletion of the entry from database.
+ // 2. A potential race condition exists where the entry expires, but before its can be deleted
+ // from the database, a new entry is added or existing entry is updated.
+ // That means ttl and expiration should be for a given version.
+ if (isLocalMemberLeader.get()) {
+ databaseService.remove(key.tableName, key.key);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to delete entry from the database after ttl expiration. Will retry eviction", e);
+ tableEntryExpirationMap.get(key.tableName).put(new DatabaseRow(key.tableName, key.key), null);
+ }
+ }
+ }
+
+ @Override
+ public void handle(LeaderElectEvent event) {
+ if (localMember.equals(event.leader())) {
+ isLocalMemberLeader.set(true);
+ }
+ }
+
+ private class DatabaseRow {
+
+ String tableName;
+ String key;
+
+ public DatabaseRow(String tableName, String key) {
+ this.tableName = tableName;
+ this.key = key;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof DatabaseRow)) {
+ return false;
+ }
+ DatabaseRow that = (DatabaseRow) obj;
+
+ return Objects.equals(this.tableName, that.tableName) &&
+ Objects.equals(this.key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, key);
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
new file mode 100644
index 0000000..f7b147f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onlab.onos.store.service.impl;
+
+public interface DatabaseUpdateEventListener {
+
+ /**
+ *
+ * @param event
+ */
+ public void tableModified(TableModificationEvent event);
+
+ /**
+ *
+ * @param tableName
+ * @param expirationTimeMillis
+ */
+ public void tableCreated(String tableName, int expirationTimeMillis);
+
+ /**
+ *
+ * @param tableName
+ */
+ public void tableDeleted(String tableName);
+
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
new file mode 100644
index 0000000..eaae063
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -0,0 +1,123 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.joda.time.DateTime;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.OptimisticLockException;
+
+/**
+ * A distributed lock implementation.
+ */
+public class DistributedLock implements Lock {
+
+ private final DistributedLockManager lockManager;
+ private final DatabaseService databaseService;
+ private final String path;
+ private DateTime lockExpirationTime;
+ private AtomicBoolean isLocked = new AtomicBoolean(false);
+ private byte[] lockId;
+
+ public DistributedLock(
+ String path,
+ DatabaseService databaseService,
+ ClusterService clusterService,
+ DistributedLockManager lockManager) {
+
+ this.path = path;
+ this.databaseService = databaseService;
+ this.lockManager = lockManager;
+ this.lockId =
+ (UUID.randomUUID().toString() + "::" + clusterService.getLocalNode().id().toString()).getBytes();
+ }
+
+ @Override
+ public String path() {
+ return path;
+ }
+
+ @Override
+ public void lock(int leaseDurationMillis) {
+
+ if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+ // Nothing to do.
+ // Current expiration time is beyond what is requested.
+ return;
+ } else {
+ tryLock(Long.MAX_VALUE, leaseDurationMillis);
+ }
+ }
+
+ @Override
+ public boolean tryLock(int leaseDurationMillis) {
+ try {
+ databaseService.putIfAbsent(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+ return true;
+ } catch (OptimisticLockException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean tryLock(
+ long waitTimeMillis,
+ int leaseDurationMillis) {
+ if (tryLock(leaseDurationMillis) == false) {
+ CompletableFuture<Void> future =
+ lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
+ try {
+ future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | InterruptedException e) {
+ // TODO: ExecutionException could indicate something
+ // wrong with the backing database.
+ // Throw an exception?
+ return false;
+ } catch (TimeoutException e) {
+ return false;
+ }
+ }
+ lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
+ return true;
+ }
+
+ @Override
+ public boolean isLocked() {
+ if (isLocked.get()) {
+ // We rely on local information to check
+ // if the expired.
+ // This should should make this call
+ // light weight, which still retaining the same
+ // safety guarantees.
+ if (DateTime.now().isAfter(lockExpirationTime)) {
+ isLocked.set(false);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ if (!isLocked()) {
+ return;
+ } else {
+ databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+ }
+ }
+
+ @Override
+ public boolean extendExpiration(int leaseDurationMillis) {
+ if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+ return true;
+ } else {
+ return tryLock(leaseDurationMillis);
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
new file mode 100644
index 0000000..c87ab37
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -0,0 +1,159 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.joda.time.DateTime;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.LockEventListener;
+import org.onlab.onos.store.service.LockService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ArrayListMultimap;
+
+@Component(immediate = true)
+@Service
+public class DistributedLockManager implements LockService {
+
+ private final Logger log = getLogger(getClass());
+
+ public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
+
+ private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private DatabaseService databaseService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ @Activate
+ public void activate() {
+ clusterCommunicator.addSubscriber(
+ DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
+ new LockEventMessageListener());
+ log.info("Started.");
+
+ }
+
+ @Deactivate
+ public void deactivate() {
+ locksToAcquire.clear();
+ log.info("Started.");
+ }
+
+ @Override
+ public Lock create(String path) {
+ return new DistributedLock(
+ path,
+ databaseService,
+ clusterService,
+ this);
+ }
+
+ @Override
+ public void addListener(LockEventListener listener) {
+ // FIXME:
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeListener(LockEventListener listener) {
+ // FIXME:
+ throw new UnsupportedOperationException();
+ }
+
+ protected CompletableFuture<Void> lockIfAvailable(Lock lock, long waitTimeMillis, int leaseDurationMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ locksToAcquire.put(
+ lock.path(),
+ new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
+ return future;
+ }
+
+ private class LockEventMessageListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
+ if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
+ return;
+ }
+
+ String path = event.key();
+ if (!locksToAcquire.containsKey(path)) {
+ return;
+ }
+
+ if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
+ List<LockRequest> existingRequests = locksToAcquire.get(path);
+ if (existingRequests == null) return;
+
+ Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
+ while (existingRequestIterator.hasNext()) {
+ LockRequest request = existingRequestIterator.next();
+ if (request.expirationTime().isAfter(DateTime.now())) {
+ existingRequestIterator.remove();
+ } else {
+ if (request.lock().tryLock(request.leaseDurationMillis()) == true) {
+ request.future().complete(null);
+ existingRequests.remove(0);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private class LockRequest {
+
+ private final Lock lock;
+ private final DateTime expirationTime;
+ private final int leaseDurationMillis;
+ private final CompletableFuture<Void> future;
+
+ public LockRequest(
+ Lock lock,
+ long waitTimeMillis,
+ int leaseDurationMillis,
+ CompletableFuture<Void> future) {
+
+ this.lock = lock;
+ this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
+ this.leaseDurationMillis = leaseDurationMillis;
+ this.future = future;
+ }
+
+ public Lock lock() {
+ return lock;
+ }
+
+ public DateTime expirationTime() {
+ return expirationTime;
+ }
+
+ public int leaseDurationMillis() {
+ return leaseDurationMillis;
+ }
+
+ public CompletableFuture<Void> future() {
+ return future;
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
new file mode 100644
index 0000000..25956de
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
@@ -0,0 +1,44 @@
+package org.onlab.onos.store.service.impl;
+
+public class TableModificationEvent {
+
+ public enum Type {
+ ROW_ADDED,
+ ROW_DELETED,
+ ROW_UPDATED
+ }
+
+ private final String tableName;
+ private final String key;
+ private final Type type;
+
+ public static TableModificationEvent rowDeleted(String tableName, String key) {
+ return new TableModificationEvent(tableName, key, Type.ROW_DELETED);
+ }
+
+ public static TableModificationEvent rowAdded(String tableName, String key) {
+ return new TableModificationEvent(tableName, key, Type.ROW_ADDED);
+ }
+
+ public static TableModificationEvent rowUpdated(String tableName, String key) {
+ return new TableModificationEvent(tableName, key, Type.ROW_UPDATED);
+ }
+
+ private TableModificationEvent(String tableName, String key, Type type) {
+ this.tableName = tableName;
+ this.key = key;
+ this.type = type;
+ }
+
+ public String tableName() {
+ return tableName;
+ }
+
+ public String key() {
+ return key;
+ }
+
+ public Type type() {
+ return type;
+ }
+}