Reworked DatabaseService API.
Initial implementation of LockManager.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index c749197..77ff062 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -8,9 +8,11 @@
import net.kuujo.copycat.Copycat;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.DatabaseException;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteResult;
/**
* Client for interacting with the Copycat Raft cluster.
@@ -63,9 +65,9 @@
}
}
- public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
+ public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
- CompletableFuture<List<InternalReadResult>> future = copycat.submit("read", requests);
+ CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
@@ -73,9 +75,9 @@
}
}
- public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
+ public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
- CompletableFuture<List<InternalWriteResult>> future = copycat.submit("write", requests);
+ CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index afeaef9..e3de134 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -4,8 +4,6 @@
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -22,7 +20,6 @@
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.log.InMemoryLog;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
@@ -37,17 +34,18 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchReadResult;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.BatchWriteResult;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
-import org.onlab.onos.store.service.NoSuchTableException;
-import org.onlab.onos.store.service.OptimisticLockException;
-import org.onlab.onos.store.service.OptionalResult;
-import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.WriteAborted;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadStatus;
+import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.WriteStatus;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
@@ -199,66 +197,121 @@
}
@Override
- public ReadResult read(ReadRequest request) {
- return batchRead(Arrays.asList(request)).get(0).get();
- }
-
- @Override
- public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
- List<ReadRequest> batch) {
- List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
- for (InternalReadResult internalReadResult : client.batchRead(batch)) {
- if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
- readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
- new NoSuchTableException()));
- } else {
- readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
- internalReadResult.result()));
- }
+ public VersionedValue get(String tableName, String key) {
+ BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
+ ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
+ if (readResult.status().equals(ReadStatus.OK)) {
+ return readResult.value();
}
- return readResults;
+ throw new DatabaseException("get failed due to status: " + readResult.status());
}
@Override
- public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
- return batchWrite(Arrays.asList(request)).get(0);
+ public BatchReadResult batchRead(BatchReadRequest batchRequest) {
+ return new BatchReadResult(client.batchRead(batchRequest));
}
@Override
- public WriteResult write(WriteRequest request) {
-// throws OptimisticLockException, PreconditionFailedException {
- return writeNothrow(request).get();
+ public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
+ return new BatchWriteResult(client.batchWrite(batchRequest));
}
@Override
- public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
- List<WriteRequest> batch) {
- List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
- for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
- if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new NoSuchTableException()));
- } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new OptimisticLockException()));
- } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
- // TODO: throw a different exception?
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new OptimisticLockException()));
- } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- new WriteAborted()));
- } else {
- writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
- internalWriteResult.result()));
- }
+ public VersionedValue put(String tableName, String key, byte[] value) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return writeResult.previousValue();
}
- return writeResults;
+ throw new DatabaseException("put failed due to status: " + writeResult.status());
+ }
+ @Override
+ public boolean putIfAbsent(String tableName, String key, byte[] value) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfAbsent(tableName, key, value).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("putIfAbsent failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean putIfVersionMatches(String tableName, String key,
+ byte[] value, long version) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfVersionMatches(tableName, key, value, version).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("putIfVersionMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean putIfValueMatches(String tableName, String key,
+ byte[] oldValue, byte[] newValue) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfValueMatches(tableName, key, oldValue, newValue).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("putIfValueMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public VersionedValue remove(String tableName, String key) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().remove(tableName, key).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return writeResult.previousValue();
+ }
+ throw new DatabaseException("remove failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean removeIfVersionMatches(String tableName, String key,
+ long version) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfVersionMatches(tableName, key, version).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("removeIfVersionMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public boolean removeIfValueMatches(String tableName, String key,
+ byte[] value) {
+ BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfValueMatches(tableName, key, value).build();
+ WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+ if (writeResult.status().equals(WriteStatus.OK)) {
+ return true;
+ } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+ return false;
+ }
+ throw new DatabaseException("removeIfValueMatches failed due to status: " + writeResult.status());
+ }
+
+ @Override
+ public void addMember(final ControllerNode node) {
+ final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+ node.tcpPort());
+ log.info("{} was added to the cluster", tcpMember);
+ synchronized (clusterConfig) {
+ clusterConfig.addRemoteMember(tcpMember);
+ }
}
private final class InternalClusterEventListener
- implements ClusterEventListener {
+ implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
@@ -266,7 +319,7 @@
final ControllerNode node = event.subject();
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
- node.tcpPort());
+ node.tcpPort());
switch (event.type()) {
case INSTANCE_ACTIVATED:
@@ -284,8 +337,8 @@
case INSTANCE_REMOVED:
if (autoAddMember) {
Set<DefaultControllerNode> members
- = tabletMembers.getOrDefault(DEFAULT_TABLET,
- Collections.emptySet());
+ = tabletMembers.getOrDefault(DEFAULT_TABLET,
+ Collections.emptySet());
// remove only if not the initial members
if (!members.contains(node)) {
synchronized (clusterConfig) {
@@ -308,63 +361,6 @@
}
- public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
- public KryoRegisteredInMemoryLog() {
- super();
- // required to deserialize object across bundles
- super.kryo.register(TcpMember.class, new TcpMemberSerializer());
- super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
- }
- }
-
- private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
-
- private final R result;
- private final DatabaseException exception;
-
- public DatabaseOperationResult(R result) {
- this.result = result;
- this.exception = null;
- }
-
- public DatabaseOperationResult(DatabaseException exception) {
- this.result = null;
- this.exception = exception;
- }
-
- @Override
- public R get() {
- if (result != null) {
- return result;
- }
- throw exception;
- }
-
- @Override
- public boolean hasValidResult() {
- return result != null;
- }
-
- @Override
- public String toString() {
- if (result != null) {
- return result.toString();
- } else {
- return exception.toString();
- }
- }
- }
-
- @Override
- public void addMember(final ControllerNode node) {
- final TcpMember tcpMember = new TcpMember(node.ip().toString(),
- node.tcpPort());
- log.info("{} was added to the cluster", tcpMember);
- synchronized (clusterConfig) {
- clusterConfig.addRemoteMember(tcpMember);
- }
- }
-
@Override
public void removeMember(final ControllerNode node) {
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 50d1203..17b174c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -8,6 +8,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@@ -15,16 +16,21 @@
import net.kuujo.copycat.Query;
import net.kuujo.copycat.StateMachine;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchWriteRequest;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.ReadStatus;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
-import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
+import org.onlab.onos.store.service.WriteStatus;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
+import com.beust.jcommander.internal.Lists;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
@@ -40,6 +46,10 @@
private final Logger log = getLogger(getClass());
+ // message subject for database update notifications.
+ public static MessageSubject DATABASE_UPDATE_EVENTS =
+ new MessageSubject("database-update-events");
+
// serializer used for snapshot
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -47,29 +57,72 @@
serializerPool = KryoNamespace.newBuilder()
.register(VersionedValue.class)
.register(State.class)
+ .register(BatchReadRequest.class)
+ .register(BatchWriteRequest.class)
+ .register(ReadStatus.class)
+ .register(WriteStatus.class)
+ // TODO: Move this out ?
+ .register(TableModificationEvent.class)
.register(ClusterMessagingProtocol.COMMON)
.build()
.populate(1);
}
};
+ private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
+
+ // durable internal state of the database.
private State state = new State();
private boolean compressSnapshot = false;
@Command
public boolean createTable(String tableName) {
- return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
+ Map<String, VersionedValue> existingTable =
+ state.getTables().putIfAbsent(tableName, Maps.newHashMap());
+ if (existingTable == null) {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableCreated(tableName, Integer.MAX_VALUE);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Command
+ public boolean createTable(String tableName, int expirationTimeMillis) {
+ Map<String, VersionedValue> existingTable =
+ state.getTables().putIfAbsent(tableName, Maps.newHashMap());
+ if (existingTable == null) {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableCreated(tableName, expirationTimeMillis);
+ }
+ return true;
+ }
+ return false;
}
@Command
public boolean dropTable(String tableName) {
- return state.getTables().remove(tableName) != null;
+ Map<String, VersionedValue> table = state.getTables().remove(tableName);
+ if (table != null) {
+ for (DatabaseUpdateEventListener listener : listeners) {
+ listener.tableDeleted(tableName);
+ }
+ return true;
+ }
+ return false;
}
@Command
public boolean dropAllTables() {
+ Set<String> tableNames = state.getTables().keySet();
state.getTables().clear();
+ for (DatabaseUpdateEventListener listener : listeners) {
+ for (String tableName : tableNames) {
+ listener.tableDeleted(tableName);
+ }
+ }
return true;
}
@@ -79,96 +132,95 @@
}
@Query
- public List<InternalReadResult> read(List<ReadRequest> requests) {
- List<InternalReadResult> results = new ArrayList<>(requests.size());
- for (ReadRequest request : requests) {
+ public List<ReadResult> read(BatchReadRequest batchRequest) {
+ List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
+ for (ReadRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
if (table == null) {
- results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
+ results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
continue;
}
VersionedValue value = VersionedValue.copy(table.get(request.key()));
- results.add(new InternalReadResult(
- InternalReadResult.Status.OK,
- new ReadResult(
- request.tableName(),
- request.key(),
- value)));
+ results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
}
return results;
}
- InternalWriteResult.Status checkIfApplicable(WriteRequest request,
- VersionedValue value) {
+ WriteStatus checkIfApplicable(WriteRequest request,
+ VersionedValue value) {
switch (request.type()) {
case PUT:
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
case PUT_IF_ABSENT:
if (value == null) {
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
}
- return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+ return WriteStatus.PRECONDITION_VIOLATION;
case PUT_IF_VALUE:
case REMOVE_IF_VALUE:
if (value != null && Arrays.equals(value.value(), request.oldValue())) {
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
}
- return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+ return WriteStatus.PRECONDITION_VIOLATION;
case PUT_IF_VERSION:
case REMOVE_IF_VERSION:
if (value != null && request.previousVersion() == value.version()) {
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
}
- return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
+ return WriteStatus.PRECONDITION_VIOLATION;
case REMOVE:
- return InternalWriteResult.Status.OK;
+ return WriteStatus.OK;
default:
break;
}
log.error("Should never reach here {}", request);
- return InternalWriteResult.Status.ABORTED;
+ return WriteStatus.ABORTED;
}
@Command
- public List<InternalWriteResult> write(List<WriteRequest> requests) {
+ public List<WriteResult> write(BatchWriteRequest batchRequest) {
// applicability check
boolean abort = false;
- List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
- for (WriteRequest request : requests) {
+ List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
+ for (WriteRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
if (table == null) {
- validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
+ validationResults.add(WriteStatus.NO_SUCH_TABLE);
abort = true;
continue;
}
final VersionedValue value = table.get(request.key());
- Status result = checkIfApplicable(request, value);
+ WriteStatus result = checkIfApplicable(request, value);
validationResults.add(result);
- if (result != Status.OK) {
+ if (result != WriteStatus.OK) {
abort = true;
}
}
- List<InternalWriteResult> results = new ArrayList<>(requests.size());
+ List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
if (abort) {
- for (InternalWriteResult.Status validationResult : validationResults) {
- if (validationResult == InternalWriteResult.Status.OK) {
+ for (WriteStatus validationResult : validationResults) {
+ if (validationResult == WriteStatus.OK) {
// aborted due to applicability check failure on other request
- results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
+ results.add(new WriteResult(WriteStatus.ABORTED, null));
} else {
- results.add(new InternalWriteResult(validationResult, null));
+ results.add(new WriteResult(validationResult, null));
}
}
return results;
}
+ List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
+
// apply changes
- for (WriteRequest request : requests) {
+ for (WriteRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+
+ TableModificationEvent tableModificationEvent = null;
// FIXME: If this method could be called by multiple thread,
// synchronization scope is wrong.
// Whole function including applicability check needs to be protected.
@@ -182,16 +234,23 @@
case PUT_IF_VERSION:
VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
VersionedValue previousValue = table.put(request.key(), newValue);
- WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
- results.add(InternalWriteResult.ok(putResult));
+ WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
+ results.add(putResult);
+ tableModificationEvent = (previousValue == null) ?
+ TableModificationEvent.rowAdded(request.tableName(), request.key()) :
+ TableModificationEvent.rowUpdated(request.tableName(), request.key());
break;
case REMOVE:
case REMOVE_IF_VALUE:
case REMOVE_IF_VERSION:
VersionedValue removedValue = table.remove(request.key());
- WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
- results.add(InternalWriteResult.ok(removeResult));
+ WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
+ results.add(removeResult);
+ if (removedValue != null) {
+ tableModificationEvent =
+ TableModificationEvent.rowDeleted(request.tableName(), request.key());
+ }
break;
default:
@@ -199,7 +258,19 @@
break;
}
}
+
+ if (tableModificationEvent != null) {
+ tableModificationEvents.add(tableModificationEvent);
+ }
}
+
+ // notify listeners of table mod events.
+ for (DatabaseUpdateEventListener listener : listeners) {
+ for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
+ listener.tableModified(tableModificationEvent);
+ }
+ }
+
return results;
}
@@ -253,4 +324,8 @@
throw new SnapshotException(e);
}
}
+
+ public void addEventListener(DatabaseUpdateEventListener listener) {
+ listeners.add(listener);
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java
new file mode 100644
index 0000000..58d5446
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onlab.onos.store.service.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import net.jodah.expiringmap.ExpiringMap;
+import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
+import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
+import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.event.EventHandler;
+import net.kuujo.copycat.event.LeaderElectEvent;
+
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.service.DatabaseService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DatabaseUpdateEventHandler implements DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public final static MessageSubject DATABASE_UPDATES = new MessageSubject("database-update-event");
+
+ private DatabaseService databaseService;
+ private ClusterService cluster;
+ private ClusterCommunicationService clusterCommunicator;
+
+ private final Member localMember;
+ private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
+ private final Map<String, Map<DatabaseRow, Void>> tableEntryExpirationMap = new HashMap<>();
+ private final ExpirationListener<DatabaseRow, Void> expirationObserver = new ExpirationObserver();
+
+ DatabaseUpdateEventHandler(Member localMember) {
+ this.localMember = localMember;
+ }
+
+ @Override
+ public void tableModified(TableModificationEvent event) {
+ DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
+ Map<DatabaseRow, Void> map = tableEntryExpirationMap.get(event.tableName());
+
+ switch (event.type()) {
+ case ROW_DELETED:
+ if (isLocalMemberLeader.get()) {
+ try {
+ clusterCommunicator.broadcast(
+ new ClusterMessage(
+ cluster.getLocalNode().id(),
+ DATABASE_UPDATES,
+ DatabaseStateMachine.SERIALIZER.encode(event)));
+ } catch (IOException e) {
+ log.error("Failed to broadcast a database table modification event.", e);
+ }
+ }
+ break;
+ case ROW_ADDED:
+ case ROW_UPDATED:
+ map.put(row, null);
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void tableCreated(String tableName, int expirationTimeMillis) {
+ // make this explicit instead of relying on a negative value
+ // to indicate no expiration.
+ if (expirationTimeMillis > 0) {
+ tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
+ .expiration(expirationTimeMillis, TimeUnit.SECONDS)
+ .expirationListener(expirationObserver)
+ // FIXME: make the expiration policy configurable.
+ .expirationPolicy(ExpirationPolicy.CREATED)
+ .build());
+ }
+ }
+
+ @Override
+ public void tableDeleted(String tableName) {
+ tableEntryExpirationMap.remove(tableName);
+ }
+
+ private class ExpirationObserver implements ExpirationListener<DatabaseRow, Void> {
+ @Override
+ public void expired(DatabaseRow key, Void value) {
+ try {
+ // TODO: The safety of this check needs to be verified.
+ // Couple of issues:
+ // 1. It is very likely that only one member should attempt deletion of the entry from database.
+ // 2. A potential race condition exists where the entry expires, but before its can be deleted
+ // from the database, a new entry is added or existing entry is updated.
+ // That means ttl and expiration should be for a given version.
+ if (isLocalMemberLeader.get()) {
+ databaseService.remove(key.tableName, key.key);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to delete entry from the database after ttl expiration. Will retry eviction", e);
+ tableEntryExpirationMap.get(key.tableName).put(new DatabaseRow(key.tableName, key.key), null);
+ }
+ }
+ }
+
+ @Override
+ public void handle(LeaderElectEvent event) {
+ if (localMember.equals(event.leader())) {
+ isLocalMemberLeader.set(true);
+ }
+ }
+
+ private class DatabaseRow {
+
+ String tableName;
+ String key;
+
+ public DatabaseRow(String tableName, String key) {
+ this.tableName = tableName;
+ this.key = key;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof DatabaseRow)) {
+ return false;
+ }
+ DatabaseRow that = (DatabaseRow) obj;
+
+ return Objects.equals(this.tableName, that.tableName) &&
+ Objects.equals(this.key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, key);
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
new file mode 100644
index 0000000..f7b147f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onlab.onos.store.service.impl;
+
+public interface DatabaseUpdateEventListener {
+
+ /**
+ *
+ * @param event
+ */
+ public void tableModified(TableModificationEvent event);
+
+ /**
+ *
+ * @param tableName
+ * @param expirationTimeMillis
+ */
+ public void tableCreated(String tableName, int expirationTimeMillis);
+
+ /**
+ *
+ * @param tableName
+ */
+ public void tableDeleted(String tableName);
+
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
new file mode 100644
index 0000000..eaae063
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -0,0 +1,123 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.joda.time.DateTime;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.OptimisticLockException;
+
+/**
+ * A distributed lock implementation.
+ */
+public class DistributedLock implements Lock {
+
+ private final DistributedLockManager lockManager;
+ private final DatabaseService databaseService;
+ private final String path;
+ private DateTime lockExpirationTime;
+ private AtomicBoolean isLocked = new AtomicBoolean(false);
+ private byte[] lockId;
+
+ public DistributedLock(
+ String path,
+ DatabaseService databaseService,
+ ClusterService clusterService,
+ DistributedLockManager lockManager) {
+
+ this.path = path;
+ this.databaseService = databaseService;
+ this.lockManager = lockManager;
+ this.lockId =
+ (UUID.randomUUID().toString() + "::" + clusterService.getLocalNode().id().toString()).getBytes();
+ }
+
+ @Override
+ public String path() {
+ return path;
+ }
+
+ @Override
+ public void lock(int leaseDurationMillis) {
+
+ if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+ // Nothing to do.
+ // Current expiration time is beyond what is requested.
+ return;
+ } else {
+ tryLock(Long.MAX_VALUE, leaseDurationMillis);
+ }
+ }
+
+ @Override
+ public boolean tryLock(int leaseDurationMillis) {
+ try {
+ databaseService.putIfAbsent(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+ return true;
+ } catch (OptimisticLockException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean tryLock(
+ long waitTimeMillis,
+ int leaseDurationMillis) {
+ if (tryLock(leaseDurationMillis) == false) {
+ CompletableFuture<Void> future =
+ lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
+ try {
+ future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | InterruptedException e) {
+ // TODO: ExecutionException could indicate something
+ // wrong with the backing database.
+ // Throw an exception?
+ return false;
+ } catch (TimeoutException e) {
+ return false;
+ }
+ }
+ lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
+ return true;
+ }
+
+ @Override
+ public boolean isLocked() {
+ if (isLocked.get()) {
+ // We rely on local information to check
+ // if the expired.
+ // This should should make this call
+ // light weight, which still retaining the same
+ // safety guarantees.
+ if (DateTime.now().isAfter(lockExpirationTime)) {
+ isLocked.set(false);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void unlock() {
+ if (!isLocked()) {
+ return;
+ } else {
+ databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+ }
+ }
+
+ @Override
+ public boolean extendExpiration(int leaseDurationMillis) {
+ if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+ return true;
+ } else {
+ return tryLock(leaseDurationMillis);
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
new file mode 100644
index 0000000..c87ab37
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -0,0 +1,159 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.joda.time.DateTime;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.LockEventListener;
+import org.onlab.onos.store.service.LockService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ArrayListMultimap;
+
+@Component(immediate = true)
+@Service
+public class DistributedLockManager implements LockService {
+
+ private final Logger log = getLogger(getClass());
+
+ public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
+
+ private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private DatabaseService databaseService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ @Activate
+ public void activate() {
+ clusterCommunicator.addSubscriber(
+ DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
+ new LockEventMessageListener());
+ log.info("Started.");
+
+ }
+
+ @Deactivate
+ public void deactivate() {
+ locksToAcquire.clear();
+ log.info("Started.");
+ }
+
+ @Override
+ public Lock create(String path) {
+ return new DistributedLock(
+ path,
+ databaseService,
+ clusterService,
+ this);
+ }
+
+ @Override
+ public void addListener(LockEventListener listener) {
+ // FIXME:
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void removeListener(LockEventListener listener) {
+ // FIXME:
+ throw new UnsupportedOperationException();
+ }
+
+ protected CompletableFuture<Void> lockIfAvailable(Lock lock, long waitTimeMillis, int leaseDurationMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ locksToAcquire.put(
+ lock.path(),
+ new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
+ return future;
+ }
+
+ private class LockEventMessageListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
+ if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
+ return;
+ }
+
+ String path = event.key();
+ if (!locksToAcquire.containsKey(path)) {
+ return;
+ }
+
+ if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
+ List<LockRequest> existingRequests = locksToAcquire.get(path);
+ if (existingRequests == null) return;
+
+ Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
+ while (existingRequestIterator.hasNext()) {
+ LockRequest request = existingRequestIterator.next();
+ if (request.expirationTime().isAfter(DateTime.now())) {
+ existingRequestIterator.remove();
+ } else {
+ if (request.lock().tryLock(request.leaseDurationMillis()) == true) {
+ request.future().complete(null);
+ existingRequests.remove(0);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private class LockRequest {
+
+ private final Lock lock;
+ private final DateTime expirationTime;
+ private final int leaseDurationMillis;
+ private final CompletableFuture<Void> future;
+
+ public LockRequest(
+ Lock lock,
+ long waitTimeMillis,
+ int leaseDurationMillis,
+ CompletableFuture<Void> future) {
+
+ this.lock = lock;
+ this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
+ this.leaseDurationMillis = leaseDurationMillis;
+ this.future = future;
+ }
+
+ public Lock lock() {
+ return lock;
+ }
+
+ public DateTime expirationTime() {
+ return expirationTime;
+ }
+
+ public int leaseDurationMillis() {
+ return leaseDurationMillis;
+ }
+
+ public CompletableFuture<Void> future() {
+ return future;
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
new file mode 100644
index 0000000..25956de
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
@@ -0,0 +1,44 @@
+package org.onlab.onos.store.service.impl;
+
+public class TableModificationEvent {
+
+ public enum Type {
+ ROW_ADDED,
+ ROW_DELETED,
+ ROW_UPDATED
+ }
+
+ private final String tableName;
+ private final String key;
+ private final Type type;
+
+ public static TableModificationEvent rowDeleted(String tableName, String key) {
+ return new TableModificationEvent(tableName, key, Type.ROW_DELETED);
+ }
+
+ public static TableModificationEvent rowAdded(String tableName, String key) {
+ return new TableModificationEvent(tableName, key, Type.ROW_ADDED);
+ }
+
+ public static TableModificationEvent rowUpdated(String tableName, String key) {
+ return new TableModificationEvent(tableName, key, Type.ROW_UPDATED);
+ }
+
+ private TableModificationEvent(String tableName, String key, Type type) {
+ this.tableName = tableName;
+ this.key = key;
+ this.type = type;
+ }
+
+ public String tableName() {
+ return tableName;
+ }
+
+ public String key() {
+ return key;
+ }
+
+ public Type type() {
+ return type;
+ }
+}