MapDB backed Copycat log implementation
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
new file mode 100644
index 0000000..893c311
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
@@ -0,0 +1,280 @@
+package org.onlab.onos.store.service.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+import net.kuujo.copycat.log.Entry;
+import net.kuujo.copycat.log.Log;
+import net.kuujo.copycat.log.LogIndexOutOfBoundsException;
+
+import org.mapdb.Atomic;
+import org.mapdb.BTreeMap;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.TxBlock;
+import org.mapdb.TxMaker;
+import org.onlab.onos.store.serializers.StoreSerializer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * MapDB based log implementation.
+ */
+public class MapDBLog implements Log {
+
+ private final File dbFile;
+ private TxMaker txMaker;
+ private final StoreSerializer serializer;
+ private static final String LOG_NAME = "log";
+ private static final String SIZE_FIELD_NAME = "size";
+
+ public MapDBLog(File dbFile, StoreSerializer serializer) {
+ this.dbFile = dbFile;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public void open() throws IOException {
+ txMaker = DBMaker
+ .newFileDB(dbFile)
+ .makeTxMaker();
+ }
+
+ @Override
+ public void close() throws IOException {
+ assertIsOpen();
+ txMaker.close();
+ txMaker = null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return txMaker != null;
+ }
+
+ protected void assertIsOpen() {
+ checkState(isOpen(), "The log is not currently open.");
+ }
+
+ @Override
+ public long appendEntry(Entry entry) {
+ checkArgument(entry != null, "expecting non-null entry");
+ return appendEntries(entry).get(0);
+ }
+
+ @Override
+ public List<Long> appendEntries(Entry... entries) {
+ checkArgument(entries != null, "expecting non-null entries");
+ return appendEntries(Arrays.asList(entries));
+ }
+
+ @Override
+ public List<Long> appendEntries(List<Entry> entries) {
+ assertIsOpen();
+ checkArgument(entries != null, "expecting non-null entries");
+ final List<Long> indices = Lists.newArrayList();
+
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ long nextIndex = log.isEmpty() ? 1 : log.lastKey() + 1;
+ for (Entry entry : entries) {
+ byte[] entryBytes = serializer.encode(entry);
+ log.put(nextIndex, entryBytes);
+ size.addAndGet(entryBytes.length);
+ indices.add(nextIndex);
+ nextIndex++;
+ }
+ }
+ });
+
+ return indices;
+ }
+
+ @Override
+ public boolean containsEntry(long index) {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.containsKey(index);
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public void delete() throws IOException {
+ assertIsOpen();
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ log.clear();
+ size.set(0);
+ }
+ });
+ }
+
+ @Override
+ public <T extends Entry> T firstEntry() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? null : serializer.decode(log.firstEntry().getValue());
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public long firstIndex() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? 0 : log.firstKey();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public <T extends Entry> List<T> getEntries(long from, long to) {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ if (log.isEmpty()) {
+ throw new LogIndexOutOfBoundsException("Log is empty");
+ } else if (from < log.firstKey()) {
+ throw new LogIndexOutOfBoundsException("From index out of bounds.");
+ } else if (to > log.lastKey()) {
+ throw new LogIndexOutOfBoundsException("To index out of bounds.");
+ }
+ List<T> entries = new ArrayList<>((int) (to - from + 1));
+ for (long i = from; i <= to; i++) {
+ T entry = serializer.decode(log.get(i));
+ entries.add(entry);
+ }
+ return entries;
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public <T extends Entry> T getEntry(long index) {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ byte[] entryBytes = log.get(index);
+ return entryBytes == null ? null : serializer.decode(entryBytes);
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public <T extends Entry> T lastEntry() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? null : serializer.decode(log.lastEntry().getValue());
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public long lastIndex() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ return log.isEmpty() ? 0 : log.lastKey();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public void removeAfter(long index) {
+ assertIsOpen();
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ long startIndex = index + 1;
+ long endIndex = log.lastKey();
+ for (long i = startIndex; i <= endIndex; ++i) {
+ byte[] entryBytes = log.remove(i);
+ size.addAndGet(-1L * entryBytes.length);
+ }
+ }
+ });
+ }
+
+ @Override
+ public long size() {
+ assertIsOpen();
+ DB db = txMaker.makeTx();
+ try {
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ return size.get();
+ } finally {
+ db.close();
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ assertIsOpen();
+ }
+
+ @Override
+ public void compact(long index, Entry entry) throws IOException {
+
+ assertIsOpen();
+ txMaker.execute(new TxBlock() {
+ @Override
+ public void tx(DB db) {
+ BTreeMap<Long, byte[]> log = db.getTreeMap(LOG_NAME);
+ Atomic.Long size = db.getAtomicLong(SIZE_FIELD_NAME);
+ ConcurrentNavigableMap<Long, byte[]> headMap = log.headMap(index);
+ long deletedBytes = headMap.keySet().stream().mapToLong(i -> log.remove(i).length).sum();
+ size.addAndGet(-1 * deletedBytes);
+ byte[] entryBytes = serializer.encode(entry);
+ byte[] existingEntry = log.put(index, entryBytes);
+ size.addAndGet(entryBytes.length - existingEntry.length);
+ db.compact();
+ }
+ });
+ }
+}
\ No newline at end of file