MapDB backed Copycat log implementation
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
new file mode 100644
index 0000000..893c311
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
@@ -0,0 +1,280 @@
+package org.onlab.onos.store.service.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+import net.kuujo.copycat.log.Entry;
+import net.kuujo.copycat.log.Log;
+import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
+
+import org.mapdb.Atomic;
+import org.mapdb.BTreeMap;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.TxBlock;
+import org.mapdb.TxMaker;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * MapDB based log implementation.
+ */
+public class MapDBLog implements Log {
+
+    private final File dbFile;
+    private TxMaker txMaker;
+    private final StoreSerializer serializer;
+    private static final String LOG_NAME = "log";
+    private static final String SIZE_FIELD_NAME = "size";
+
+    public MapDBLog(File dbFile, StoreSerializer serializer) {
+        this.dbFile = dbFile;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public void open() throws IOException {
+        txMaker = DBMaker
+                .newFileDB(dbFile)
+                .makeTxMaker();
+    }
+
+    @Override
+    public void close() throws IOException {
+        assertIsOpen();
+        txMaker.close();
+        txMaker = null;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return txMaker != null;
+    }
+
+    protected void assertIsOpen() {
+        checkState(isOpen(), "The log is not currently open.");
+    }
+
+    @Override
+    public long appendEntry(Entry entry) {
+        checkArgument(entry != null, "expecting non-null entry");
+        return appendEntries(entry).get(0);
+    }
+
+    @Override
+    public List<Long> appendEntries(Entry... entries) {
+        checkArgument(entries != null, "expecting non-null entries");
+        return appendEntries(Arrays.asList(entries));
+    }
+
+    @Override
+    public List<Long> appendEntries(List<Entry> entries) {
+        assertIsOpen();
+        checkArgument(entries != null, "expecting non-null entries");
+        final List<Long> indices = Lists.newArrayList();
+
+        txMaker.execute(new TxBlock() {
+            @Override
+            public void tx(DB db) {
+                BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+                long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
+                for (Entry entry : entries) {
+                    byte[] entryBytes = serializer.encode(entry);
+                    log.put(nextIndex, entryBytes);
+                    size.addAndGet(entryBytes.length);
+                    indices.add(nextIndex);
+                    nextIndex++;
+                }
+            }
+        });
+
+        return indices;
+    }
+
+    @Override
+    public boolean containsEntry(long index) {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.containsKey(index);
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public void delete() throws IOException {
+        assertIsOpen();
+        txMaker.execute(new TxBlock() {
+            @Override
+            public void tx(DB db) {
+                BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+                log.clear();
+                size.set(0);
+            }
+        });
+    }
+
+    @Override
+    public <T extends Entry> T firstEntry() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public long firstIndex() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty() ? 0 : log.firstKey();
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public <T extends Entry> List<T> getEntries(long from, long to) {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            if (log.isEmpty()) {
+                throw new LogIndexOutOfBoundsException("Log is empty");
+            } else if (from < log.firstKey()) {
+                throw new LogIndexOutOfBoundsException("From index out of bounds.");
+            } else if (to > log.lastKey()) {
+                throw new LogIndexOutOfBoundsException("To index out of bounds.");
+            }
+            List<T> entries = new ArrayList<>((int) (to - from + 1));
+            for (long i = from; i <= to; i++) {
+                T entry = serializer.decode(log.get(i));
+                entries.add(entry);
+            }
+            return entries;
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public <T extends Entry> T getEntry(long index) {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            byte[] entryBytes = log.get(index);
+            return entryBytes == null ? null : serializer.decode(entryBytes);
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public boolean isEmpty() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty();
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public <T extends Entry> T lastEntry() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public long lastIndex() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+            return log.isEmpty() ? 0 : log.lastKey();
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public void removeAfter(long index) {
+        assertIsOpen();
+        txMaker.execute(new TxBlock() {
+            @Override
+            public void tx(DB db) {
+                BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+                long startIndex = index + 1;
+                long endIndex = log.lastKey();
+                for (long i = startIndex; i <= endIndex; ++i) {
+                    byte[] entryBytes = log.remove(i);
+                    size.addAndGet(-1L * entryBytes.length);
+                }
+            }
+        });
+    }
+
+    @Override
+    public long size() {
+        assertIsOpen();
+        DB db = txMaker.makeTx();
+        try {
+            Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+            return size.get();
+        } finally {
+            db.close();
+        }
+    }
+
+    @Override
+    public void sync() throws IOException {
+        assertIsOpen();
+    }
+
+    @Override
+    public void compact(long index, Entry entry) throws IOException {
+
+        assertIsOpen();
+        txMaker.execute(new TxBlock() {
+            @Override
+            public void tx(DB db) {
+                BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+                Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+                ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
+                long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
+                size.addAndGet(-1 * deletedBytes);
+                byte[] entryBytes = serializer.encode(entry);
+                byte[] existingEntry = log.put(index, entryBytes);
+                size.addAndGet(entryBytes.length - existingEntry.length);
+                db.compact();
+            }
+        });
+    }
+}
\ No newline at end of file