DatabaseService that uses Copycat Raft to provide a strongly consistent and durable database.
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index eebb8ff..846583f 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -44,6 +44,24 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>net.kuujo.copycat</groupId>
+ <artifactId>copycat</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>net.kuujo.copycat</groupId>
+ <artifactId>copycat-chronicle</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>net.kuujo.copycat</groupId>
+ <artifactId>copycat-tcp</artifactId>
+ <version>0.4.0-SNAPSHOT</version>
+ </dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
new file mode 100644
index 0000000..7cb5f55
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
@@ -0,0 +1,35 @@
+package org.onlab.onos.store.service;
+
+import java.util.List;
+
+/**
+ * Service for running administrative tasks on a Database.
+ */
+public interface DatabaseAdminService {
+
+ /**
+ * Creates a new table.
+ * Table creation is idempotent. Attempting to create a table
+ * that already exists will be a noop.
+ * @param name table name.
+ * @return true if the table was created by this call, false otherwise.
+ */
+ public boolean createTable(String name);
+
+ /**
+ * Lists all the tables in the database.
+ * @return list of table names.
+ */
+ public List<String> listTables();
+
+ /**
+ * Deletes a table from the database.
+ * @param name name of the table to delete.
+ */
+ public void dropTable(String name);
+
+ /**
+ * Deletes all tables from the database.
+ */
+ public void dropAllTables();
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseException.java
new file mode 100644
index 0000000..bbc2daf
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseException.java
@@ -0,0 +1,22 @@
+package org.onlab.onos.store.service;
+
+/**
+ * Base exception type for database failures.
+ */
+@SuppressWarnings("serial")
+public class DatabaseException extends RuntimeException {
+ public DatabaseException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public DatabaseException(String message) {
+ super(message);
+ }
+
+ public DatabaseException(Throwable t) {
+ super(t);
+ }
+
+ public DatabaseException() {
+ };
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
new file mode 100644
index 0000000..08bfa36
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/DatabaseService.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.service;
+
+import java.util.List;
+
+public interface DatabaseService {
+
+ /**
+ * Performs a read on the database.
+ * @param request read request.
+ * @return ReadResult
+ * @throws DatabaseException
+ */
+ ReadResult read(ReadRequest request);
+
+ /**
+ * Performs a batch read operation on the database.
+ * The main advantage of batch read operation is parallelization.
+ * @param batch batch of read requests to execute.
+ * @return
+ */
+ List<OptionalResult<ReadResult, DatabaseException>> batchRead(List<ReadRequest> batch);
+
+ /**
+ * Performs a write operation on the database.
+ * @param request
+ * @return write result.
+ * @throws DatabaseException
+ */
+ WriteResult write(WriteRequest request);
+
+ /**
+ * Performs a batch write operation on the database.
+ * Batch write provides transactional semantics. Either all operations
+ * succeed or none of them do.
+ * @param batch batch of write requests to execute as a transaction.
+ * @return result of executing the batch write operation.
+ */
+ List<OptionalResult<WriteResult, DatabaseException>> batchWrite(List<WriteRequest> batch);
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/NoSuchTableException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/NoSuchTableException.java
new file mode 100644
index 0000000..fad17ce
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/NoSuchTableException.java
@@ -0,0 +1,9 @@
+package org.onlab.onos.store.service;
+
+/**
+ * Exception thrown when an operation (read or write) is requested for
+ * a table that does not exist.
+ */
+@SuppressWarnings("serial")
+public class NoSuchTableException extends DatabaseException {
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/OptimisticLockException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/OptimisticLockException.java
new file mode 100644
index 0000000..090eb63
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/OptimisticLockException.java
@@ -0,0 +1,8 @@
+package org.onlab.onos.store.service;
+
+/**
+ * Exception that indicates a optimistic lock failure.
+ */
+@SuppressWarnings("serial")
+public class OptimisticLockException extends PreconditionFailedException {
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/OptionalResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/OptionalResult.java
new file mode 100644
index 0000000..fa38e3f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/OptionalResult.java
@@ -0,0 +1,26 @@
+package org.onlab.onos.store.service;
+
+/**
+ * A container object which either has a result or an exception.
+ * <p>
+ * If a result is present, get() will return it otherwise get() will throw
+ * the exception that was encountered in the process of generating the result.
+ *
+ * @param <R> type of result.
+ * @param <E> exception encountered in generating the result.
+ */
+public interface OptionalResult<R, E extends Throwable> {
+
+ /**
+ * Returns the result.
+ * @return result
+ * @throws E if there is no valid result.
+ */
+ public R get();
+
+ /**
+ * Returns true if there is a valid result.
+ * @return true is yes, false otherwise.
+ */
+ public boolean hasValidResult();
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java
new file mode 100644
index 0000000..f16fc47
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java
@@ -0,0 +1,14 @@
+package org.onlab.onos.store.service;
+
+/**
+ * Exception that indicates a precondition failure.
+ * <ul>Scenarios that can cause this exception:
+ * <li>An operation that attempts to write a new value iff the current value is equal
+ * to some specified value.</li>
+ * <li>An operation that attempts to write a new value iff the current version
+ * matches a specified value</li>
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class PreconditionFailedException extends DatabaseException {
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java
new file mode 100644
index 0000000..6c0ba30
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadRequest.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.service;
+
+/**
+ * Database read request.
+ */
+public class ReadRequest {
+
+ private final String tableName;
+ private final String key;
+
+ public ReadRequest(String tableName, String key) {
+ this.tableName = tableName;
+ this.key = key;
+ }
+
+ /**
+ * Return the name of the table.
+ * @return table name.
+ */
+ public String tableName() {
+ return tableName;
+ }
+
+ /**
+ * Returns the key.
+ * @return key.
+ */
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public String toString() {
+ return "ReadRequest [tableName=" + tableName + ", key=" + key + "]";
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
new file mode 100644
index 0000000..2d7649a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/ReadResult.java
@@ -0,0 +1,43 @@
+package org.onlab.onos.store.service;
+
+import org.onlab.onos.store.service.impl.VersionedValue;
+
+/**
+ * Database read result.
+ */
+public class ReadResult {
+
+ private final String tableName;
+ private final String key;
+ private final VersionedValue value;
+
+ public ReadResult(String tableName, String key, VersionedValue value) {
+ this.tableName = tableName;
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * Database table name.
+ * @return
+ */
+ public String tableName() {
+ return tableName;
+ }
+
+ /**
+ * Database table key.
+ * @return key.
+ */
+ public String key() {
+ return key;
+ }
+
+ /**
+ * value associated with the key.
+ * @return non-null value if the table contains one, null otherwise.
+ */
+ public VersionedValue value() {
+ return value;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteAborted.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteAborted.java
new file mode 100644
index 0000000..a7d3fe3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteAborted.java
@@ -0,0 +1,9 @@
+package org.onlab.onos.store.service;
+
+/**
+ * Exception that indicates a write operation is aborted.
+ * Aborted operations do not mutate database state is any form.
+ */
+@SuppressWarnings("serial")
+public class WriteAborted extends DatabaseException {
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteRequest.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteRequest.java
new file mode 100644
index 0000000..7314e4f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteRequest.java
@@ -0,0 +1,93 @@
+package org.onlab.onos.store.service;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Objects;
+
+/**
+ * Database write request.
+ */
+public class WriteRequest {
+
+ private final String tableName;
+ private final String key;
+ private final byte[] newValue;
+ private final long previousVersion;
+ private final byte[] oldValue;
+
+ public WriteRequest(String tableName, String key, byte[] newValue) {
+ this(tableName, key, newValue, -1, null);
+ }
+
+ public WriteRequest(String tableName, String key, byte[] newValue, long previousVersion) {
+ this(tableName, key, newValue, previousVersion, null);
+ checkArgument(previousVersion >= 0);
+ }
+
+ public WriteRequest(String tableName, String key, byte[] newValue, byte[] oldValue) {
+ this(tableName, key, newValue, -1, oldValue);
+ }
+
+ private WriteRequest(String tableName, String key, byte[] newValue, long previousVersion, byte[] oldValue) {
+
+ checkArgument(tableName != null);
+ checkArgument(key != null);
+ checkArgument(newValue != null);
+
+ this.tableName = tableName;
+ this.key = key;
+ this.newValue = newValue;
+ this.previousVersion = previousVersion;
+ this.oldValue = oldValue;
+ }
+
+ public String tableName() {
+ return tableName;
+ }
+
+ public String key() {
+ return key;
+ }
+
+ public byte[] newValue() {
+ return newValue;
+ }
+
+ public long previousVersion() {
+ return previousVersion;
+ }
+
+ public byte[] oldValue() {
+ return oldValue;
+ }
+
+ @Override
+ public String toString() {
+ return "WriteRequest [tableName=" + tableName + ", key=" + key
+ + ", newValue=" + newValue
+ + ", previousVersion=" + previousVersion
+ + ", oldValue=" + oldValue;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, tableName, previousVersion);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ WriteRequest other = (WriteRequest) obj;
+ return Objects.equals(this.key, other.key) &&
+ Objects.equals(this.tableName, other.tableName) &&
+ Objects.equals(this.previousVersion, other.previousVersion);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteResult.java
new file mode 100644
index 0000000..64e9b74
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/WriteResult.java
@@ -0,0 +1,31 @@
+package org.onlab.onos.store.service;
+
+import org.onlab.onos.store.service.impl.VersionedValue;
+
+/**
+ * Database write result.
+ */
+public class WriteResult {
+
+ private final String tableName;
+ private final String key;
+ private final VersionedValue previousValue;
+
+ public WriteResult(String tableName, String key, VersionedValue previousValue) {
+ this.tableName = tableName;
+ this.key = key;
+ this.previousValue = previousValue;
+ }
+
+ public String tableName() {
+ return tableName;
+ }
+
+ public String key() {
+ return key;
+ }
+
+ public VersionedValue previousValue() {
+ return previousValue;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
new file mode 100644
index 0000000..3c92800
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -0,0 +1,144 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import net.kuujo.copycat.protocol.Response.Status;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.NettyMessagingService;
+import org.onlab.onos.store.service.DatabaseException;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.WriteRequest;
+
+public class DatabaseClient {
+
+ private final Endpoint copycatEp;
+ ProtocolClient client;
+ NettyMessagingService messagingService;
+
+ public DatabaseClient(Endpoint copycatEp) {
+ this.copycatEp = copycatEp;
+ }
+
+ private static String nextId() {
+ return UUID.randomUUID().toString();
+ }
+
+ public void activate() throws Exception {
+ messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000));
+ messagingService.activate();
+ client = new NettyProtocolClient(copycatEp, messagingService);
+ }
+
+ public void deactivate() throws Exception {
+ messagingService.deactivate();
+ }
+
+ public boolean createTable(String tableName) {
+
+ SubmitRequest request =
+ new SubmitRequest(
+ nextId(),
+ "createTable",
+ Arrays.asList(tableName));
+ CompletableFuture<SubmitResponse> future = client.submit(request);
+ try {
+ return (boolean) future.get().result();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ public void dropTable(String tableName) {
+
+ SubmitRequest request =
+ new SubmitRequest(
+ nextId(),
+ "dropTable",
+ Arrays.asList(tableName));
+ CompletableFuture<SubmitResponse> future = client.submit(request);
+ try {
+ if (future.get().status() == Status.OK) {
+ throw new DatabaseException(future.get().toString());
+ }
+
+ } catch (InterruptedException | ExecutionException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ public void dropAllTables() {
+
+ SubmitRequest request =
+ new SubmitRequest(
+ nextId(),
+ "dropAllTables",
+ Arrays.asList());
+ CompletableFuture<SubmitResponse> future = client.submit(request);
+ try {
+ if (future.get().status() != Status.OK) {
+ throw new DatabaseException(future.get().toString());
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<String> listTables() {
+
+ SubmitRequest request =
+ new SubmitRequest(
+ nextId(),
+ "listTables",
+ Arrays.asList());
+ CompletableFuture<SubmitResponse> future = client.submit(request);
+ try {
+ return (List<String>) future.get().result();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
+
+ SubmitRequest request = new SubmitRequest(
+ nextId(),
+ "read",
+ Arrays.asList(requests));
+
+ CompletableFuture<SubmitResponse> future = client.submit(request);
+ try {
+ List<InternalReadResult> internalReadResults = (List<InternalReadResult>) future.get().result();
+ return internalReadResults;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
+
+ SubmitRequest request = new SubmitRequest(
+ nextId(),
+ "write",
+ Arrays.asList(requests));
+
+ CompletableFuture<SubmitResponse> future = client.submit(request);
+ try {
+ List<InternalWriteResult> internalWriteResults = (List<InternalWriteResult>) future.get().result();
+ return internalWriteResults;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new DatabaseException(e);
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
new file mode 100644
index 0000000..00ce12d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -0,0 +1,210 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import net.kuujo.copycat.Copycat;
+import net.kuujo.copycat.StateMachine;
+import net.kuujo.copycat.cluster.TcpCluster;
+import net.kuujo.copycat.cluster.TcpClusterConfig;
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.log.ChronicleLog;
+import net.kuujo.copycat.log.Log;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.netty.Endpoint;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.store.service.DatabaseAdminService;
+import org.onlab.onos.store.service.DatabaseException;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.NoSuchTableException;
+import org.onlab.onos.store.service.OptimisticLockException;
+import org.onlab.onos.store.service.OptionalResult;
+import org.onlab.onos.store.service.PreconditionFailedException;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteAborted;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Strongly consistent and durable state management service based on
+ * Copycat implementation of Raft consensus protocol.
+ */
+@Component(immediate = true)
+@Service
+public class DatabaseManager implements DatabaseService, DatabaseAdminService {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ ClusterService clusterService;
+
+ public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
+
+ private Copycat copycat;
+ private DatabaseClient client;
+
+ @Activate
+ public void activate() {
+ TcpMember localMember =
+ new TcpMember(
+ clusterService.getLocalNode().ip().toString(),
+ clusterService.getLocalNode().tcpPort());
+ List<TcpMember> remoteMembers = Lists.newArrayList();
+
+ for (ControllerNode node : clusterService.getNodes()) {
+ TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
+ if (!member.equals(localMember)) {
+ remoteMembers.add(member);
+ }
+ }
+
+ // Configure the cluster.
+ TcpClusterConfig config = new TcpClusterConfig();
+
+ config.setLocalMember(localMember);
+ config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
+
+ // Create the cluster.
+ TcpCluster cluster = new TcpCluster(config);
+
+ StateMachine stateMachine = new DatabaseStateMachine();
+ ControllerNode thisNode = clusterService.getLocalNode();
+ Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
+
+ copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
+ copycat.start();
+
+ client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
+
+ log.info("Started.");
+ }
+
+ @Activate
+ public void deactivate() {
+ copycat.stop();
+ }
+
+ @Override
+ public boolean createTable(String name) {
+ return client.createTable(name);
+ }
+
+ @Override
+ public void dropTable(String name) {
+ client.dropTable(name);
+ }
+
+ @Override
+ public void dropAllTables() {
+ client.dropAllTables();
+ }
+
+ @Override
+ public List<String> listTables() {
+ return client.listTables();
+ }
+
+ @Override
+ public ReadResult read(ReadRequest request) {
+ return batchRead(Arrays.asList(request)).get(0).get();
+ }
+
+ @Override
+ public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
+ List<ReadRequest> batch) {
+ List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
+ for (InternalReadResult internalReadResult : client.batchRead(batch)) {
+ if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
+ readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
+ new NoSuchTableException()));
+ } else {
+ readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
+ internalReadResult.result()));
+ }
+ }
+ return readResults;
+ }
+
+ @Override
+ public WriteResult write(WriteRequest request) {
+ return batchWrite(Arrays.asList(request)).get(0).get();
+ }
+
+ @Override
+ public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
+ List<WriteRequest> batch) {
+ List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
+ for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
+ if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
+ writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+ new NoSuchTableException()));
+ } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
+ writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+ new OptimisticLockException()));
+ } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
+ // TODO: throw a different exception?
+ writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+ new PreconditionFailedException()));
+ } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
+ writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+ new WriteAborted()));
+ } else {
+ writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
+ internalWriteResult.result()));
+ }
+ }
+ return writeResults;
+
+ }
+
+ private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
+
+ private final R result;
+ private final DatabaseException exception;
+
+ public DatabaseOperationResult(R result) {
+ this.result = result;
+ this.exception = null;
+ }
+
+ public DatabaseOperationResult(DatabaseException exception) {
+ this.result = null;
+ this.exception = exception;
+ }
+
+ @Override
+ public R get() {
+ if (result != null) {
+ return result;
+ }
+ throw exception;
+ }
+
+ @Override
+ public boolean hasValidResult() {
+ return result != null;
+ }
+
+ @Override
+ public String toString() {
+ if (result != null) {
+ return result.toString();
+ } else {
+ return exception.toString();
+ }
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
new file mode 100644
index 0000000..cbca729
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -0,0 +1,169 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.kuujo.copycat.Command;
+import net.kuujo.copycat.Query;
+import net.kuujo.copycat.StateMachine;
+
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.onlab.util.KryoNamespace;
+
+import com.google.common.collect.Maps;
+
+public class DatabaseStateMachine implements StateMachine {
+
+ public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(VersionedValue.class)
+ .register(State.class)
+ .register(NettyProtocol.COMMON)
+ .build()
+ .populate(1);
+ }
+ };
+
+ private State state = new State();
+
+ @Command
+ public boolean createTable(String tableName) {
+ return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
+ }
+
+ @Command
+ public boolean dropTable(String tableName) {
+ return state.getTables().remove(tableName) != null;
+ }
+
+ @Command
+ public boolean dropAllTables() {
+ state.getTables().clear();
+ return true;
+ }
+
+ @Query
+ public Set<String> listTables() {
+ return state.getTables().keySet();
+ }
+
+ @Query
+ public List<InternalReadResult> read(List<ReadRequest> requests) {
+ List<InternalReadResult> results = new ArrayList<>(requests.size());
+ for (ReadRequest request : requests) {
+ Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+ if (table == null) {
+ results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
+ continue;
+ }
+ VersionedValue value = table.get(request.key());
+ results.add(new InternalReadResult(
+ InternalReadResult.Status.OK,
+ new ReadResult(
+ request.tableName(),
+ request.key(),
+ value)));
+ }
+ return results;
+ }
+
+ @Command
+ public List<InternalWriteResult> write(List<WriteRequest> requests) {
+ boolean abort = false;
+ List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
+ for (WriteRequest request : requests) {
+ Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+ if (table == null) {
+ validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
+ abort = true;
+ continue;
+ }
+ VersionedValue value = table.get(request.key());
+ if (value == null) {
+ if (request.oldValue() != null) {
+ validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
+ abort = true;
+ continue;
+ } else if (request.previousVersion() >= 0) {
+ validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
+ abort = true;
+ continue;
+ }
+ }
+ if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
+ validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
+ abort = true;
+ continue;
+ }
+
+ validationResults.add(InternalWriteResult.Status.OK);
+ }
+
+ List<InternalWriteResult> results = new ArrayList<>(requests.size());
+
+ if (abort) {
+ for (InternalWriteResult.Status validationResult : validationResults) {
+ if (validationResult == InternalWriteResult.Status.OK) {
+ results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
+ } else {
+ results.add(new InternalWriteResult(validationResult, null));
+ }
+ }
+ return results;
+ }
+
+ for (WriteRequest request : requests) {
+ Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+ synchronized (table) {
+ VersionedValue previousValue =
+ table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
+ results.add(new InternalWriteResult(
+ InternalWriteResult.Status.OK,
+ new WriteResult(request.tableName(), request.key(), previousValue)));
+ }
+ }
+ return results;
+ }
+
+ public class State {
+
+ private final Map<String, Map<String, VersionedValue>> tables =
+ Maps.newHashMap();
+ private long versionCounter = 1;
+
+ Map<String, Map<String, VersionedValue>> getTables() {
+ return tables;
+ }
+
+ long nextVersion() {
+ return versionCounter++;
+ }
+ }
+
+ @Override
+ public byte[] takeSnapshot() {
+ try {
+ return SERIALIZER.encode(state);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ @Override
+ public void installSnapshot(byte[] data) {
+ try {
+ this.state = SERIALIZER.decode(data);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java
new file mode 100644
index 0000000..f6fcf51
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java
@@ -0,0 +1,33 @@
+package org.onlab.onos.store.service.impl;
+
+import org.onlab.onos.store.service.ReadResult;
+
+public class InternalReadResult {
+
+ public enum Status {
+ OK,
+ NO_SUCH_TABLE
+ }
+
+ private final Status status;
+ private final ReadResult result;
+
+ public InternalReadResult(Status status, ReadResult result) {
+ this.status = status;
+ this.result = result;
+ }
+
+ public Status status() {
+ return status;
+ }
+
+ public ReadResult result() {
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "InternalReadResult [status=" + status + ", result=" + result
+ + "]";
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java
new file mode 100644
index 0000000..e6dbb1f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java
@@ -0,0 +1,30 @@
+package org.onlab.onos.store.service.impl;
+
+import org.onlab.onos.store.service.WriteResult;
+
+public class InternalWriteResult {
+
+ public enum Status {
+ OK,
+ ABORTED,
+ NO_SUCH_TABLE,
+ OPTIMISTIC_LOCK_FAILURE,
+ PREVIOUS_VALUE_MISMATCH
+ }
+
+ private final Status status;
+ private final WriteResult result;
+
+ public InternalWriteResult(Status status, WriteResult result) {
+ this.status = status;
+ this.result = result;
+ }
+
+ public Status status() {
+ return status;
+ }
+
+ public WriteResult result() {
+ return result;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
new file mode 100644
index 0000000..9855ec6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocol.java
@@ -0,0 +1,145 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Vector;
+
+import net.kuujo.copycat.cluster.TcpClusterConfig;
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.internal.log.ConfigurationEntry;
+import net.kuujo.copycat.internal.log.CopycatEntry;
+import net.kuujo.copycat.internal.log.OperationEntry;
+import net.kuujo.copycat.internal.log.SnapshotEntry;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.Response.Status;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.Protocol;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+import net.kuujo.copycat.spi.protocol.ProtocolServer;
+
+import org.onlab.onos.store.serializers.ImmutableListSerializer;
+import org.onlab.onos.store.serializers.ImmutableMapSerializer;
+import org.onlab.onos.store.serializers.ImmutableSetSerializer;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
+import org.onlab.util.KryoNamespace;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}.
+ */
+public class NettyProtocol implements Protocol<TcpMember> {
+
+ public static final String COPYCAT_PING = "copycat-raft-consensus-ping";
+ public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync";
+ public static final String COPYCAT_POLL = "copycat-raft-consensus-poll";
+ public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit";
+
+ // TODO: make this configurable.
+ public static final long RETRY_INTERVAL_MILLIS = 2000;
+
+ private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+ .register(PingRequest.class)
+ .register(PingResponse.class)
+ .register(PollRequest.class)
+ .register(PollResponse.class)
+ .register(SyncRequest.class)
+ .register(SyncResponse.class)
+ .register(SubmitRequest.class)
+ .register(SubmitResponse.class)
+ .register(Status.class)
+ .register(ConfigurationEntry.class)
+ .register(SnapshotEntry.class)
+ .register(CopycatEntry.class)
+ .register(OperationEntry.class)
+ .register(TcpClusterConfig.class)
+ .register(TcpMember.class)
+ .build();
+
+ // TODO: Move to the right place.
+ private static final KryoNamespace CRAFT = KryoNamespace.newBuilder()
+ .register(ReadRequest.class)
+ .register(WriteRequest.class)
+ .register(InternalReadResult.class)
+ .register(InternalWriteResult.class)
+ .register(InternalReadResult.Status.class)
+ .register(WriteResult.class)
+ .register(ReadResult.class)
+ .register(InternalWriteResult.Status.class)
+ .register(VersionedValue.class)
+ .build();
+
+ public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
+ .register(Arrays.asList().getClass(), new CollectionSerializer() {
+ @Override
+ @SuppressWarnings("rawtypes")
+ protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
+ return new ArrayList();
+ }
+ })
+ .register(ImmutableMap.class, new ImmutableMapSerializer())
+ .register(ImmutableList.class, new ImmutableListSerializer())
+ .register(ImmutableSet.class, new ImmutableSetSerializer())
+ .register(
+ Vector.class,
+ ArrayList.class,
+ Arrays.asList().getClass(),
+ HashMap.class,
+ HashSet.class,
+ LinkedList.class,
+ byte[].class)
+ .build();
+
+ public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(COPYCAT)
+ .register(COMMON)
+ .register(CRAFT)
+ .build()
+ .populate(1);
+ }
+ };
+
+ private NettyProtocolServer server = null;
+
+ // FIXME: This is a total hack.Assumes
+ // ProtocolServer is initialized before ProtocolClient
+ protected NettyProtocolServer getServer() {
+ if (server == null) {
+ throw new IllegalStateException("ProtocolServer is not initialized yet!");
+ }
+ return server;
+ }
+
+ @Override
+ public ProtocolServer createServer(TcpMember member) {
+ server = new NettyProtocolServer(member);
+ return server;
+ }
+
+ @Override
+ public ProtocolClient createClient(TcpMember member) {
+ return new NettyProtocolClient(this, member);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
new file mode 100644
index 0000000..a791990
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolClient.java
@@ -0,0 +1,148 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
+
+import org.onlab.netty.Endpoint;
+import org.onlab.netty.NettyMessagingService;
+import org.slf4j.Logger;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * {@link NettyMessagingService} based Copycat protocol client.
+ */
+public class NettyProtocolClient implements ProtocolClient {
+
+ private final Logger log = getLogger(getClass());
+ private static final ThreadFactory THREAD_FACTORY =
+ new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
+
+ // Remote endpoint, this client instance is used
+ // for communicating with.
+ private final Endpoint remoteEp;
+ private final NettyMessagingService messagingService;
+
+ // TODO: Is 10 the right number of threads?
+ private static final ScheduledExecutorService THREAD_POOL =
+ new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
+
+ public NettyProtocolClient(NettyProtocol protocol, TcpMember member) {
+ this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService());
+ }
+
+ public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) {
+ this.remoteEp = remoteEp;
+ this.messagingService = messagingService;
+ }
+
+ @Override
+ public CompletableFuture<PingResponse> ping(PingRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<SyncResponse> sync(SyncRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<PollResponse> poll(PollRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
+ return requestReply(request);
+ }
+
+ @Override
+ public CompletableFuture<Void> connect() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ public <I> String messageType(I input) {
+ Class<?> clazz = input.getClass();
+ if (clazz.equals(PollRequest.class)) {
+ return NettyProtocol.COPYCAT_POLL;
+ } else if (clazz.equals(SyncRequest.class)) {
+ return NettyProtocol.COPYCAT_SYNC;
+ } else if (clazz.equals(SubmitRequest.class)) {
+ return NettyProtocol.COPYCAT_SUBMIT;
+ } else if (clazz.equals(PingRequest.class)) {
+ return NettyProtocol.COPYCAT_PING;
+ } else {
+ throw new IllegalArgumentException("Unknown class " + clazz.getName());
+ }
+
+ }
+
+ private <I, O> CompletableFuture<O> requestReply(I request) {
+ CompletableFuture<O> future = new CompletableFuture<>();
+ THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
+ return future;
+ }
+
+ private class RPCTask<I, O> implements Runnable {
+
+ private final String messageType;
+ private final byte[] payload;
+
+ private final CompletableFuture<O> future;
+
+ public RPCTask(I request, CompletableFuture<O> future) {
+ this.messageType = messageType(request);
+ this.payload = NettyProtocol.SERIALIZER.encode(request);
+ this.future = future;
+ }
+
+ @Override
+ public void run() {
+ try {
+ byte[] response = messagingService
+ .sendAndReceive(remoteEp, messageType, payload)
+ .get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ future.complete(NettyProtocol.SERIALIZER.decode(response));
+
+ } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
+ if (messageType.equals(NettyProtocol.COPYCAT_SYNC) ||
+ messageType.equals(NettyProtocol.COPYCAT_PING)) {
+ log.warn("Request to {} failed. Will retry "
+ + "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS);
+ THREAD_POOL.schedule(
+ this,
+ NettyProtocol.RETRY_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS);
+ } else {
+ future.completeExceptionally(e);
+ }
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
new file mode 100644
index 0000000..d06999e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/NettyProtocolServer.java
@@ -0,0 +1,115 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import net.kuujo.copycat.cluster.TcpMember;
+import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.RequestHandler;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.spi.protocol.ProtocolServer;
+
+import org.onlab.netty.Message;
+import org.onlab.netty.MessageHandler;
+import org.onlab.netty.NettyMessagingService;
+import org.slf4j.Logger;
+
+/**
+ * {@link NettyMessagingService} based Copycat protocol server.
+ */
+public class NettyProtocolServer implements ProtocolServer {
+
+ private final Logger log = getLogger(getClass());
+
+ private final NettyMessagingService messagingService;
+ private RequestHandler handler;
+
+
+ public NettyProtocolServer(TcpMember member) {
+ messagingService = new NettyMessagingService(member.host(), member.port());
+
+ messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
+ messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
+ messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
+ messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
+ }
+
+ protected NettyMessagingService getNettyMessagingService() {
+ return messagingService;
+ }
+
+ @Override
+ public void requestHandler(RequestHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public CompletableFuture<Void> listen() {
+ try {
+ messagingService.activate();
+ return CompletableFuture.completedFuture(null);
+ } catch (Exception e) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ messagingService.deactivate();
+ future.complete(null);
+ return future;
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ return future;
+ }
+ }
+
+ private class CopycatMessageHandler<T> implements MessageHandler {
+
+ @Override
+ public void handle(Message message) throws IOException {
+ T request = NettyProtocol.SERIALIZER.decode(message.payload());
+ if (request.getClass().equals(PingRequest.class)) {
+ handler.ping((PingRequest) request).whenComplete((response, error) -> {
+ try {
+ message.respond(NettyProtocol.SERIALIZER.encode(response));
+ } catch (Exception e) {
+ log.error("Failed to respond to ping request", e);
+ }
+ });
+ } else if (request.getClass().equals(PollRequest.class)) {
+ handler.poll((PollRequest) request).whenComplete((response, error) -> {
+ try {
+ message.respond(NettyProtocol.SERIALIZER.encode(response));
+ } catch (Exception e) {
+ log.error("Failed to respond to poll request", e);
+ }
+ });
+ } else if (request.getClass().equals(SyncRequest.class)) {
+ handler.sync((SyncRequest) request).whenComplete((response, error) -> {
+ try {
+ message.respond(NettyProtocol.SERIALIZER.encode(response));
+ } catch (Exception e) {
+ log.error("Failed to respond to sync request", e);
+ }
+ });
+ } else if (request.getClass().equals(SubmitRequest.class)) {
+ handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
+ try {
+ message.respond(NettyProtocol.SERIALIZER.encode(response));
+ } catch (Exception e) {
+ log.error("Failed to respond to submit request", e);
+ }
+ });
+ }
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java
new file mode 100644
index 0000000..31bdcc2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/VersionedValue.java
@@ -0,0 +1,44 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.Arrays;
+
+/**
+ * Wrapper object that holds the object (as byte array) and its version.
+ */
+public class VersionedValue {
+
+ private final byte[] value;
+ private final long version;
+
+ /**
+ * Creates a new instance with the specified value and version.
+ * @param value
+ * @param version
+ */
+ public VersionedValue(byte[] value, long version) {
+ this.value = value;
+ this.version = version;
+ }
+
+ /**
+ * Returns the value.
+ * @return value.
+ */
+ public byte[] value() {
+ return value;
+ }
+
+ /**
+ * Returns the version.
+ * @return version.
+ */
+ public long version() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return "VersionedValue [value=" + Arrays.toString(value) + ", version="
+ + version + "]";
+ }
+}
diff --git a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/RoleReplyInfo.java b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/RoleReplyInfo.java
index b408c44..6efb333 100644
--- a/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/RoleReplyInfo.java
+++ b/openflow/api/src/main/java/org/onlab/onos/openflow/controller/driver/RoleReplyInfo.java
@@ -32,9 +32,15 @@
this.genId = genId;
this.xid = xid;
}
- public RoleState getRole() { return role; }
- public U64 getGenId() { return genId; }
- public long getXid() { return xid; }
+ public RoleState getRole() {
+ return role;
+ }
+ public U64 getGenId() {
+ return genId;
+ }
+ public long getXid() {
+ return xid;
+ }
@Override
public String toString() {
return "[Role:" + role + " GenId:" + genId + " Xid:" + xid + "]";
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/RoleManager.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/RoleManager.java
index 6e5b236..734e0ba 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/RoleManager.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/RoleManager.java
@@ -347,7 +347,7 @@
RoleState role = null;
OFNiciraControllerRole ncr = nrr.getRole();
- switch(ncr) {
+ switch (ncr) {
case ROLE_MASTER:
role = RoleState.MASTER;
break;
@@ -383,7 +383,7 @@
throws SwitchStateException {
OFControllerRole cr = rrmsg.getRole();
RoleState role = null;
- switch(cr) {
+ switch (cr) {
case ROLE_EQUAL:
role = RoleState.EQUAL;
break;
diff --git a/pom.xml b/pom.xml
index e84c846..dfc86a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -414,7 +414,7 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
- <version>2.3.7</version>
+ <version>2.5.3</version>
<extensions>true</extensions>
</plugin>
@@ -493,6 +493,12 @@
<artifactId>onos-build-conf</artifactId>
<version>1.0</version>
</dependency>
+ <!-- For Java 8 lambda support-->
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>5.9</version>
+ </dependency>
</dependencies>
<configuration>
<configLocation>onos/checkstyle.xml</configLocation>