MapDB backed Copycat log implementation
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 248e763..f0f37c7 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -64,6 +64,12 @@
 -->
 
         <dependency>
+            <groupId>org.mapdb</groupId>
+            <artifactId>mapdb</artifactId>
+            <version>1.0.6</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index 7d94847..b3eaeb4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -3,12 +3,17 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 
 import net.kuujo.copycat.protocol.PingRequest;
+import net.kuujo.copycat.protocol.PingResponse;
 import net.kuujo.copycat.protocol.PollRequest;
+import net.kuujo.copycat.protocol.PollResponse;
 import net.kuujo.copycat.protocol.RequestHandler;
 import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
 import net.kuujo.copycat.protocol.SyncRequest;
+import net.kuujo.copycat.protocol.SyncResponse;
 import net.kuujo.copycat.spi.protocol.ProtocolServer;
 
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
@@ -57,37 +62,37 @@
         public void handle(ClusterMessage message) {
             T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
             if (request.getClass().equals(PingRequest.class)) {
-                handler.ping((PingRequest) request).whenComplete((response, error) -> {
-                    try {
-                        message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
-                    } catch (Exception e) {
-                        log.error("Failed to respond to ping request", e);
-                    }
-                });
+                handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
             } else if (request.getClass().equals(PollRequest.class)) {
-                handler.poll((PollRequest) request).whenComplete((response, error) -> {
-                    try {
-                        message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
-                    } catch (Exception e) {
-                        log.error("Failed to respond to poll request", e);
-                    }
-                });
+                handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
             } else if (request.getClass().equals(SyncRequest.class)) {
-                handler.sync((SyncRequest) request).whenComplete((response, error) -> {
-                    try {
-                        message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
-                    } catch (Exception e) {
-                        log.error("Failed to respond to sync request", e);
-                    }
-                });
+                handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
             } else if (request.getClass().equals(SubmitRequest.class)) {
-                handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
+                handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
+            } else {
+                throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
+            }
+        }
+
+        private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
+
+            private final ClusterMessage message;
+
+            public PostExecutionTask(ClusterMessage message) {
+                this.message = message;
+            }
+
+            @Override
+            public void accept(R response, Throwable t) {
+                if (t != null) {
+                    log.error("Processing for " + message.subject() + " failed.", t);
+                } else {
                     try {
                         message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
                     } catch (Exception e) {
-                        log.error("Failed to respond to submit request", e);
+                        log.error("Failed to respond to " + response.getClass().getName(), e);
                     }
-                });
+                }
             }
         }
     }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index d132b7c..9662976 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -172,8 +172,8 @@
         try {
             return SERIALIZER.encode(state);
         } catch (Exception e) {
-            log.error("Snapshot serialization error", e);
-            return null;
+            log.error("Failed to take snapshot", e);
+            throw new SnapshotException(e);
         }
     }
 
@@ -182,7 +182,8 @@
         try {
             this.state = SERIALIZER.decode(data);
         } catch (Exception e) {
-            log.error("Snapshot deserialization error", e);
+            log.error("Failed to install from snapshot", e);
+            throw new SnapshotException(e);
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
new file mode 100644
index 0000000..893c311
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
@@ -0,0 +1,280 @@
+package org.onlab.onos.store.service.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+import net.kuujo.copycat.log.Entry;
+import net.kuujo.copycat.log.Log;
+import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
+
+import org.mapdb.Atomic;
+import org.mapdb.BTreeMap;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.TxBlock;
+import org.mapdb.TxMaker;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * MapDB based log implementation.
+ */
+public class MapDBLog implements Log {
+
+    private final File dbFile;
+    private TxMaker txMaker;
+    private final StoreSerializer serializer;
+    private static final String LOG_NAME = "log";
+    private static final String SIZE_FIELD_NAME = "size";
+
+    public MapDBLog(File dbFile, StoreSerializer serializer) {
+        this.dbFile = dbFile;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public void open() throws IOException {
+        txMaker = DBMaker
+                .newFileDB(dbFile)
+                .makeTxMaker();
+    }
+
+    @Override
+    public void close() throws IOException {
+        assertIsOpen();
+        txMaker.close();
+        txMaker = null;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return txMaker != null;
+    }
+
+    protected void assertIsOpen() {
+        checkState(isOpen(), "The log is not currently open.");
+    }
+
+    @Override
+    public long appendEntry(Entry entry) {
+        checkArgument(entry != null, "expecting non-null entry");
+        return appendEntries(entry).get(0);
+    }
+
+    @Override
+    public List<Long> appendEntries(Entry... entries) {
+        checkArgument(entries != null, "expecting non-null entries");
+        return appendEntries(Arrays.asList(entries));
+    }
+
+    @Override
+    public List<Long> appendEntries(List<Entry> entries) {
+        assertIsOpen();
+        checkArgument(entries != null, "expecting non-null entries");
+        final List<Long> indices = Lists.newArrayList();
+
+        txMaker.execute(new TxBlock() {
+            @Override
+            public void tx(DB db) {
+                BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+                long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
+                for (Entry entry : entries) {
+                    byte[] entryBytes = serializer.encode(entry);
+                    log.put(nextIndex, entryBytes);
+                    size.addAndGet(entryBytes.length);
+                    indices.add(nextIndex);
+                    nextIndex++;
+                }
+            }
+        });
+
+        return indices;
+    }
+
+    @Override
+    public boolean containsEntry(long index) {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.containsKey(index);
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public void delete() throws IOException {
+        assertIsOpen();
+        txMaker.execute(new TxBlock() {
+            @Override
+            public void tx(DB db) {
+                BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+                log.clear();
+                size.set(0);
+            }
+        });
+    }
+
+    @Override
+    public <T extends Entry> T firstEntry() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public long firstIndex() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty() ? 0 : log.firstKey();
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public <T extends Entry> List<T> getEntries(long from, long to) {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            if (log.isEmpty()) {
+                throw new LogIndexOutOfBoundsException("Log is empty");
+            } else if (from < log.firstKey()) {
+                throw new LogIndexOutOfBoundsException("From index out of bounds.");
+            } else if (to > log.lastKey()) {
+                throw new LogIndexOutOfBoundsException("To index out of bounds.");
+            }
+            List<T> entries = new ArrayList<>((int) (to - from + 1));
+            for (long i = from; i <= to; i++) {
+                T entry = serializer.decode(log.get(i));
+                entries.add(entry);
+            }
+            return entries;
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public <T extends Entry> T getEntry(long index) {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            byte[] entryBytes = log.get(index);
+            return entryBytes == null ? null : serializer.decode(entryBytes);
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public boolean isEmpty() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty();
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public <T extends Entry> T lastEntry() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public long lastIndex() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty() ? 0 : log.lastKey();
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public void removeAfter(long index) {
+        assertIsOpen();
+        txMaker.execute(new TxBlock() {
+            @Override
+            public void tx(DB db) {
+                BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+                long startIndex = index + 1;
+                long endIndex = log.lastKey();
+                for (long i = startIndex; i <= endIndex; ++i) {
+                    byte[] entryBytes = log.remove(i);
+                    size.addAndGet(-1L * entryBytes.length);
+                }
+            }
+        });
+    }
+
+    @Override
+    public long size() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+            return size.get();
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public void sync() throws IOException {
+        assertIsOpen();
+    }
+
+    @Override
+    public void compact(long index, Entry entry) throws IOException {
+
+        assertIsOpen();
+        txMaker.execute(new TxBlock() {
+            @Override
+            public void tx(DB db) {
+                BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+                ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
+                long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
+                size.addAndGet(-1 * deletedBytes);
+                byte[] entryBytes = serializer.encode(entry);
+                byte[] existingEntry = log.put(index, entryBytes);
+                size.addAndGet(entryBytes.length - existingEntry.length);
+                db.compact();
+            }
+        });
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java
new file mode 100644
index 0000000..4cfc13b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/SnapshotException.java
@@ -0,0 +1,13 @@
+package org.onlab.onos.store.service.impl;
+
+import org.onlab.onos.store.service.DatabaseException;
+
+/**
+ * Exception that indicates a problem with the state machine snapshotting.
+ */
+@SuppressWarnings("serial")
+public class SnapshotException extends DatabaseException {
+    public SnapshotException(Throwable t) {
+        super(t);
+    }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
new file mode 100644
index 0000000..75beefd
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/service/impl/MapDBLogTest.java
@@ -0,0 +1,193 @@
+package org.onlab.onos.store.service.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+
+import net.kuujo.copycat.internal.log.OperationEntry;
+import net.kuujo.copycat.log.Entry;
+import net.kuujo.copycat.log.Log;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.testing.EqualsTester;
+
+/**
+ * Test the MapDBLog implementation.
+ */
+public class MapDBLogTest {
+
+    private static final String DB_FILE_NAME = "mapdbTest";
+    private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.SERIALIZER;
+    private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
+    private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
+    private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");
+    private static final Entry TEST_ENTRY4 = new OperationEntry(4, "test1234");
+
+    private static final Entry TEST_SNAPSHOT_ENTRY = new OperationEntry(5, "snapshot");
+
+    private static final long TEST_ENTRY1_SIZE = SERIALIZER.encode(TEST_ENTRY1).length;
+    private static final long TEST_ENTRY2_SIZE = SERIALIZER.encode(TEST_ENTRY2).length;
+    private static final long TEST_ENTRY3_SIZE = SERIALIZER.encode(TEST_ENTRY3).length;
+    private static final long TEST_ENTRY4_SIZE = SERIALIZER.encode(TEST_ENTRY4).length;
+
+    private static final long TEST_SNAPSHOT_ENTRY_SIZE = SERIALIZER.encode(TEST_SNAPSHOT_ENTRY).length;
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        Files.deleteIfExists(new File(DB_FILE_NAME).toPath());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testAssertOpen() {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        log.size();
+    }
+
+    @Test
+    public void testAppendEntry() throws IOException {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        log.open();
+        log.appendEntry(TEST_ENTRY1);
+        OperationEntry first = log.firstEntry();
+        OperationEntry last = log.lastEntry();
+        new EqualsTester()
+            .addEqualityGroup(first, last, TEST_ENTRY1)
+            .testEquals();
+        Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
+        Assert.assertEquals(1, log.firstIndex());
+        Assert.assertEquals(1, log.lastIndex());
+    }
+
+    @Test
+    public void testAppendEntries() throws IOException {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        log.open();
+        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3);
+        OperationEntry first = log.firstEntry();
+        OperationEntry last = log.lastEntry();
+        new EqualsTester()
+            .addEqualityGroup(first, TEST_ENTRY1)
+            .addEqualityGroup(last, TEST_ENTRY3)
+            .testEquals();
+        Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY2_SIZE, TEST_ENTRY3_SIZE, log.size());
+        Assert.assertEquals(1, log.firstIndex());
+        Assert.assertEquals(3, log.lastIndex());
+        Assert.assertTrue(log.containsEntry(1));
+        Assert.assertTrue(log.containsEntry(2));
+    }
+
+    @Test
+    public void testDelete() throws IOException {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        log.open();
+        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2);
+        log.delete();
+        Assert.assertEquals(0, log.size());
+        Assert.assertTrue(log.isEmpty());
+        Assert.assertEquals(0, log.firstIndex());
+        Assert.assertNull(log.firstEntry());
+        Assert.assertEquals(0, log.lastIndex());
+        Assert.assertNull(log.lastEntry());
+    }
+
+    @Test
+    public void testGetEntries() throws IOException {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        log.open();
+        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+        Assert.assertEquals(
+                TEST_ENTRY1_SIZE +
+                TEST_ENTRY2_SIZE +
+                TEST_ENTRY3_SIZE +
+                TEST_ENTRY4_SIZE, log.size());
+
+        List<Entry> entries = log.getEntries(2, 3);
+        new EqualsTester()
+            .addEqualityGroup(log.getEntry(4), TEST_ENTRY4)
+            .addEqualityGroup(entries.get(0), TEST_ENTRY2)
+            .addEqualityGroup(entries.get(1), TEST_ENTRY3)
+            .testEquals();
+    }
+
+    @Test
+    public void testRemoveAfter() throws IOException {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        log.open();
+        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+        log.removeAfter(1);
+        Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
+        new EqualsTester()
+            .addEqualityGroup(log.firstEntry(), log.lastEntry(), TEST_ENTRY1)
+            .testEquals();
+    }
+
+    @Test
+    public void testAddAfterRemove() throws IOException {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        log.open();
+        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+        log.removeAfter(1);
+        log.appendEntry(TEST_ENTRY4);
+        Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE, log.size());
+        new EqualsTester()
+            .addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
+            .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
+            .addEqualityGroup(log.size(), TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE)
+            .testEquals();
+    }
+
+    @Test
+    public void testClose() throws IOException {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        Assert.assertFalse(log.isOpen());
+        log.open();
+        Assert.assertTrue(log.isOpen());
+        log.close();
+        Assert.assertFalse(log.isOpen());
+    }
+
+    @Test
+    public void testReopen() throws IOException {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        log.open();
+        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+        log.close();
+        log.open();
+
+        new EqualsTester()
+            .addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
+            .addEqualityGroup(log.getEntry(2), TEST_ENTRY2)
+            .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
+            .addEqualityGroup(log.size(),
+                    TEST_ENTRY1_SIZE +
+                    TEST_ENTRY2_SIZE +
+                    TEST_ENTRY3_SIZE +
+                    TEST_ENTRY4_SIZE)
+            .testEquals();
+    }
+
+    @Test
+    public void testCompact() throws IOException {
+        Log log = new MapDBLog(new File(DB_FILE_NAME), SERIALIZER);
+        log.open();
+        log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
+        log.compact(3, TEST_SNAPSHOT_ENTRY);
+        new EqualsTester()
+        .addEqualityGroup(log.firstEntry(), TEST_SNAPSHOT_ENTRY)
+        .addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
+        .addEqualityGroup(log.size(),
+                TEST_SNAPSHOT_ENTRY_SIZE +
+                TEST_ENTRY4_SIZE)
+        .testEquals();
+    }
+}