MapDB backed Copycat log implementation
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 248e763..f0f37c7 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -64,6 +64,12 @@
-->
<dependency>
+ <groupId>org.mapdb</groupId>
+ <artifactId>mapdb</artifactId>
+ <version>1.0.6</version>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index 7d94847..b3eaeb4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -3,12 +3,17 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
@@ -57,37 +62,37 @@
public void handle(ClusterMessage message) {
T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
if (request.getClass().equals(PingRequest.class)) {
- handler.ping((PingRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to ping request", e);
- }
- });
+ handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
} else if (request.getClass().equals(PollRequest.class)) {
- handler.poll((PollRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to poll request", e);
- }
- });
+ handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
} else if (request.getClass().equals(SyncRequest.class)) {
- handler.sync((SyncRequest) request).whenComplete((response, error) -> {
- try {
- message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
- } catch (Exception e) {
- log.error("Failed to respond to sync request", e);
- }
- });
+ handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
} else if (request.getClass().equals(SubmitRequest.class)) {
- handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
+ handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
+ } else {
+ throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
+ }
+ }
+
+ private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
+
+ private final ClusterMessage message;
+
+ public PostExecutionTask(ClusterMessage message) {
+ this.message = message;
+ }
+
+ @Override
+ public void accept(R response, Throwable t) {
+ if (t != null) {
+ log.error("Processing for " + message.subject() + " failed.", t);
+ } else {
try {
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
- log.error("Failed to respond to submit request", e);
+ log.error("Failed to respond to " + response.getClass().getName(), e);
}
- });
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index d132b7c..9662976 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -172,8 +172,8 @@
try {
return SERIALIZER.encode(state);
} catch (Exception e) {
- log.error("Snapshot serialization error", e);
- return null;
+ log.error("Failed to take snapshot", e);
+ throw new SnapshotException(e);
}
}
@@ -182,7 +182,8 @@
try {
this.state = SERIALIZER.decode(data);
} catch (Exception e) {
- log.error("Snapshot deserialization error", e);
+ log.error("Failed to install from snapshot", e);
+ throw new SnapshotException(e);
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
new file mode 100644
index 0000000..893c311
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
@@ -0,0 +1,280 @@
+package org.onlab.onos.store.service.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+import net.kuujo.copycat.log.Entry;
+import net.kuujo.copycat.log.Log;
+import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
+
+import org.mapdb.Atomic;
+import org.mapdb.BTreeMap;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.TxBlock;
+import org.mapdb.TxMaker;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * MapDB based log implementation.
+ */
+public class MapDBLog implements Log {
+
+ private final File dbFile;
+ private TxMaker txMaker;
+ private final StoreSerializer serializer;
+ private static final String LOG_NAME = "log";
+ private static final String SIZE_FIELD_NAME = "size";
+
+ public MapDBLog(File dbFile, StoreSerializer serializer) {
+ this.dbFile = dbFile;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void open() throws IOException {
+ txMaker = DBMaker
+ .newFileDB(dbFile)
+ .makeTxMaker();
+ }
+
+ @Override
+ public void close() throws IOException {
+ assertIsOpen();
+ txMaker.close();
+ txMaker = null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return txMaker != null;
+ }
+
+ protected void assertIsOpen() {
+ checkState(isOpen(), "The log is not currently open.");
+ }
+
+ @Override
+ public long appendEntry(Entry entry) {
+ checkArgument(entry != null, "expecting non-null entry");
+ return appendEntries(entry).get(0);
+ }
+
+ @Override
+ public List<Long> appendEntries(Entry... entries) {
+ checkArgument(entries != null, "expecting non-null entries");
+ return appendEntries(Arrays.asList(entries));
+ }
+
+ @Override
+ public List<Long> appendEntries(List<Entry> entries) {
+ assertIsOpen();
+ checkArgument(entries != null, "expecting non-null entries");
+ final List<Long> indices = Lists.newArrayList();
+
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
+ for (Entry entry : entries) {
+ byte[] entryBytes = serializer.encode(entry);
+ log.put(nextIndex, entryBytes);
+ size.addAndGet(entryBytes.length);
+ indices.add(nextIndex);
+ nextIndex++;
+ }
+ }
+ });
+
+ return indices;
+ }
+
+ @Override
+ public boolean containsEntry(long index) {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.containsKey(index);
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public void delete() throws IOException {
+ assertIsOpen();
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ log.clear();
+ size.set(0);
+ }
+ });
+ }
+
+ @Override
+ public <T extends Entry> T firstEntry() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public long firstIndex() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? 0 : log.firstKey();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public <T extends Entry> List<T> getEntries(long from, long to) {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ if (log.isEmpty()) {
+ throw new LogIndexOutOfBoundsException("Log is empty");
+ } else if (from < log.firstKey()) {
+ throw new LogIndexOutOfBoundsException("From index out of bounds.");
+ } else if (to > log.lastKey()) {
+ throw new LogIndexOutOfBoundsException("To index out of bounds.");
+ }
+ List<T> entries = new ArrayList<>((int) (to - from + 1));
+ for (long i = from; i <= to; i++) {
+ T entry = serializer.decode(log.get(i));
+ entries.add(entry);
+ }
+ return entries;
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public <T extends Entry> T getEntry(long index) {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ byte[] entryBytes = log.get(index);
+ return entryBytes == null ? null : serializer.decode(entryBytes);
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public <T extends Entry> T lastEntry() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public long lastIndex() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? 0 : log.lastKey();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public void removeAfter(long index) {
+ assertIsOpen();
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ long startIndex = index + 1;
+ long endIndex = log.lastKey();
+ for (long i = startIndex; i <= endIndex; ++i) {
+ byte[] entryBytes = log.remove(i);
+ size.addAndGet(-1L * entryBytes.length);
+ }
+ }
+ });
+ }
+
+ @Override
+ public long size() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ return size.get();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ assertIsOpen();
+ }
+
+ @Override
+ public void compact(long index, Entry entry) throws IOException {
+
+ assertIsOpen();
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
+ long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
+ size.addAndGet(-1 * deletedBytes);
+ byte[] entryBytes = serializer.encode(entry);
+ byte[] existingEntry = log.put(index, entryBytes);
+ size.addAndGet(entryBytes.length - existingEntry.length);
+ db.compact();
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java
new file mode 100644
index 0000000..4cfc13b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java
@@ -0,0 +1,13 @@
+package org.onlab.onos.store.service.impl;
+
+import org.onlab.onos.store.service.DatabaseException;
+
+/**
+ * Exception that indicates a problem with the state machine snapshotting.
+ */
+@SuppressWarnings("serial")
+public class SnapshotException extends DatabaseException {
+ public SnapshotException(Throwable t) {
+ super(t);
+ }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
new file mode 100644
index 0000000..75beefd
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
@@ -0,0 +1,193 @@
+package org.onlab.onos.store.service.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+
+import net.kuujo.copycat.internal.log.OperationEntry;
+import net.kuujo.copycat.log.Entry;
+import net.kuujo.copycat.log.Log;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.testing.EqualsTester;
+
+/**
+ * Test the MapDBLog implementation.
+ */
+public class MapDBLogTest {
+
+ private static final String DB_FILE_NAME = "mapdbTest";
+ private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.SERIALIZER;
+ private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
+ private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
+ private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");
+ private static final Entry TEST_ENTRY4 = new OperationEntry(4, "test1234");
+
+ private static final Entry TEST_SNAPSHOT_ENTRY = new OperationEntry(5, "snapshot");
+
+ private static final long TEST_ENTRY1_SIZE = SERIALIZER.encode(TEST_ENTRY1).length;
+ private static final long TEST_ENTRY2_SIZE = SERIALIZER.encode(TEST_ENTRY2).length;
+ private static final long TEST_ENTRY3_SIZE = SERIALIZER.encode(TEST_ENTRY3).length;
+ private static final long TEST_ENTRY4_SIZE = SERIALIZER.encode(TEST_ENTRY4).length;
+
+ private static final long TEST_SNAPSHOT_ENTRY_SIZE = SERIALIZER.encode(TEST_SNAPSHOT_ENTRY).length;
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Files.deleteIfExists(new File(DB_FILE_NAME).toPath());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testAssertOpen() {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.size();
+ }
+
+ @Test
+ public void testAppendEntry() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntry(TEST_ENTRY1);
+ OperationEntry first = log.firstEntry();
+ OperationEntry last = log.lastEntry();
+ new EqualsTester()
+ .addEqualityGroup(first, last, TEST_ENTRY1)
+ .testEquals();
+ Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
+ Assert.assertEquals(1, log.firstIndex());
+ Assert.assertEquals(1, log.lastIndex());
+ }
+
+ @Test
+ public void testAppendEntries() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3);
+ OperationEntry first = log.firstEntry();
+ OperationEntry last = log.lastEntry();
+ new EqualsTester()
+ .addEqualityGroup(first, TEST_ENTRY1)
+ .addEqualityGroup(last, TEST_ENTRY3)
+ .testEquals();
+ Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY2_SIZE, TEST_ENTRY3_SIZE, log.size());
+ Assert.assertEquals(1, log.firstIndex());
+ Assert.assertEquals(3, log.lastIndex());
+ Assert.assertTrue(log.containsEntry(1));
+ Assert.assertTrue(log.containsEntry(2));
+ }
+
+ @Test
+ public void testDelete() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2);
+ log.delete();
+ Assert.assertEquals(0, log.size());
+ Assert.assertTrue(log.isEmpty());
+ Assert.assertEquals(0, log.firstIndex());
+ Assert.assertNull(log.firstEntry());
+ Assert.assertEquals(0, log.lastIndex());
+ Assert.assertNull(log.lastEntry());
+ }
+
+ @Test
+ public void testGetEntries() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ Assert.assertEquals(
+ TEST_ENTRY1_SIZE +
+ TEST_ENTRY2_SIZE +
+ TEST_ENTRY3_SIZE +
+ TEST_ENTRY4_SIZE, log.size());
+
+ List<Entry> entries = log.getEntries(2, 3);
+ new EqualsTester()
+ .addEqualityGroup(log.getEntry(4), TEST_ENTRY4)
+ .addEqualityGroup(entries.get(0), TEST_ENTRY2)
+ .addEqualityGroup(entries.get(1), TEST_ENTRY3)
+ .testEquals();
+ }
+
+ @Test
+ public void testRemoveAfter() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ log.removeAfter(1);
+ Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
+ new EqualsTester()
+ .addEqualityGroup(log.firstEntry(), log.lastEntry(), TEST_ENTRY1)
+ .testEquals();
+ }
+
+ @Test
+ public void testAddAfterRemove() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ log.removeAfter(1);
+ log.appendEntry(TEST_ENTRY4);
+ Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE, log.size());
+ new EqualsTester()
+ .addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
+ .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
+ .addEqualityGroup(log.size(), TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE)
+ .testEquals();
+ }
+
+ @Test
+ public void testClose() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ Assert.assertFalse(log.isOpen());
+ log.open();
+ Assert.assertTrue(log.isOpen());
+ log.close();
+ Assert.assertFalse(log.isOpen());
+ }
+
+ @Test
+ public void testReopen() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ log.close();
+ log.open();
+
+ new EqualsTester()
+ .addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
+ .addEqualityGroup(log.getEntry(2), TEST_ENTRY2)
+ .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
+ .addEqualityGroup(log.size(),
+ TEST_ENTRY1_SIZE +
+ TEST_ENTRY2_SIZE +
+ TEST_ENTRY3_SIZE +
+ TEST_ENTRY4_SIZE)
+ .testEquals();
+ }
+
+ @Test
+ public void testCompact() throws IOException {
+ Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+ log.open();
+ log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+ log.compact(3, TEST_SNAPSHOT_ENTRY);
+ new EqualsTester()
+ .addEqualityGroup(log.firstEntry(), TEST_SNAPSHOT_ENTRY)
+ .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
+ .addEqualityGroup(log.size(),
+ TEST_SNAPSHOT_ENTRY_SIZE +
+ TEST_ENTRY4_SIZE)
+ .testEquals();
+ }
+}