Add persistence option to ECMap

Work towards ONOS-1337

Change-Id: I24e6a42e2f8856b363e79786829c51344797b81b
diff --git a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
index 779c329..75ac27d 100644
--- a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMapBuilder.java
@@ -166,6 +166,16 @@
     public EventuallyConsistentMapBuilder<K, V> withFasterConvergence();
 
     /**
+     * Configure the map to persist data to disk.
+     * <p>
+     * The default behavior is no persistence
+     * </p>
+     *
+     * @return this EventuallyConsistentMapBuilder
+     */
+    public EventuallyConsistentMapBuilder<K, V> withPersistence();
+
+    /**
      * Builds an eventually consistent map based on the configuration options
      * supplied to this builder.
      *
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index 2c9e4a9..7384988 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -65,7 +65,7 @@
         <dependency>
             <groupId>org.mapdb</groupId>
             <artifactId>mapdb</artifactId>
-            <version>1.0.6</version>
+	    <version>1.0.7</version>
         </dependency>
 
         <dependency>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
index 9d60143..23b219b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -51,6 +51,7 @@
     private long antiEntropyPeriod = 5;
     private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
     private boolean convergeFaster = false;
+    private boolean persistent = false;
 
     /**
      * Creates a new eventually consistent map builder.
@@ -131,6 +132,12 @@
     }
 
     @Override
+    public EventuallyConsistentMapBuilder<K, V> withPersistence() {
+        persistent = true;
+        return this;
+    }
+
+    @Override
     public EventuallyConsistentMap<K, V> build() {
         checkNotNull(name, "name is a mandatory parameter");
         checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
@@ -148,6 +155,7 @@
                                                  tombstonesDisabled,
                                                  antiEntropyPeriod,
                                                  antiEntropyTimeUnit,
-                                                 convergeFaster);
+                                                 convergeFaster,
+                                                 persistent);
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 99eef3d..33e4251 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -93,7 +93,6 @@
             = new CopyOnWriteArraySet<>();
 
     private final ExecutorService executor;
-
     private final ScheduledExecutorService backgroundExecutor;
     private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
 
@@ -116,6 +115,9 @@
     private static final int LOAD_WINDOW = 2;
     private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
 
+    private final boolean persistent;
+    private final PersistentStore<K, V> persistentStore;
+
     /**
      * Creates a new eventually consistent map shared amongst multiple instances.
      * <p>
@@ -140,9 +142,9 @@
      * @param tombstonesDisabled    true if this map should not maintain
      *                              tombstones
      * @param antiEntropyPeriod     period that the anti-entropy task should run
-     *                              in seconds
-     * @param antiEntropyTimeUnit   time unit for anti-entropy task scheduling
+     * @param antiEntropyTimeUnit   time unit for anti-entropy period
      * @param convergeFaster        make anti-entropy try to converge faster
+     * @param persistent            persist data to disk
      */
     EventuallyConsistentMapImpl(String mapName,
                                 ClusterService clusterService,
@@ -156,7 +158,8 @@
                                 boolean tombstonesDisabled,
                                 long antiEntropyPeriod,
                                 TimeUnit antiEntropyTimeUnit,
-                                boolean convergeFaster) {
+                                boolean convergeFaster,
+                                boolean persistent) {
         items = new ConcurrentHashMap<>();
         removedItems = new ConcurrentHashMap<>();
         senderPending = Maps.newConcurrentMap();
@@ -195,6 +198,21 @@
                     newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
         }
 
+        this.persistent = persistent;
+
+        if (this.persistent) {
+            String dataDirectory = System.getProperty("karaf.data", "./data");
+            String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
+
+            ExecutorService dbExecutor =
+                    newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
+
+            persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
+            persistentStore.readInto(items, removedItems);
+        } else {
+            this.persistentStore = null;
+        }
+
         if (backgroundExecutor != null) {
             this.backgroundExecutor = backgroundExecutor;
         } else {
@@ -232,6 +250,7 @@
                         .register(ArrayList.class)
                         .register(AntiEntropyAdvertisement.class)
                         .register(HashMap.class)
+                        .register(Timestamped.class)
                         .build();
             }
         };
@@ -321,6 +340,11 @@
         if (success && removed != null) {
             removedItems.remove(key, removed);
         }
+
+        if (success && persistent) {
+            persistentStore.put(key, value, timestamp);
+        }
+
         return success;
     }
 
@@ -357,24 +381,29 @@
                 // remove from items map
                 return null;
             }
-            });
+        });
 
         if (updated.isFalse()) {
             return false;
         }
 
+        boolean updatedTombstone = false;
+
         if (!tombstonesDisabled) {
             Timestamp removedTimestamp = removedItems.get(key);
             if (removedTimestamp == null) {
-                return removedItems.putIfAbsent(key, timestamp) == null;
+                //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
+                updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
             } else if (timestamp.isNewerThan(removedTimestamp)) {
-                return removedItems.replace(key, removedTimestamp, timestamp);
-            } else {
-                return false;
+                updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
             }
         }
 
-        return updated.booleanValue();
+        if (updated.booleanValue() && persistent) {
+            persistentStore.remove(key, timestamp);
+        }
+
+        return (!tombstonesDisabled && updatedTombstone) || updated.booleanValue();
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
new file mode 100644
index 0000000..f803bb8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/MapDbPersistentStore.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.ecmap;
+
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.mapdb.Hasher;
+import org.mapdb.Serializer;
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.impl.Timestamped;
+import org.onosproject.store.serializers.KryoSerializer;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * MapDB based implementation of a persistent store.
+ */
+class MapDbPersistentStore<K, V> implements PersistentStore<K, V> {
+
+    private final ExecutorService executor;
+    private final KryoSerializer serializer;
+
+    private final DB database;
+
+    private final Map<byte[], byte[]> items;
+    private final Map<byte[], byte[]> tombstones;
+
+    /**
+     * Creates a new MapDB based persistent store.
+     *
+     * @param filename filename of the database on disk
+     * @param executor executor to use for tasks that write to the disk
+     * @param serializer serializer for keys and values
+     */
+    MapDbPersistentStore(String filename, ExecutorService executor,
+                         KryoSerializer serializer) {
+        this.executor = checkNotNull(executor);
+        this.serializer = checkNotNull(serializer);
+
+        File databaseFile = new File(filename);
+
+        database = DBMaker.newFileDB(databaseFile).make();
+
+        items = database.createHashMap("items")
+                .keySerializer(Serializer.BYTE_ARRAY)
+                .valueSerializer(Serializer.BYTE_ARRAY)
+                .hasher(Hasher.BYTE_ARRAY)
+                .makeOrGet();
+
+        tombstones = database.createHashMap("tombstones")
+                .keySerializer(Serializer.BYTE_ARRAY)
+                .valueSerializer(Serializer.BYTE_ARRAY)
+                .hasher(Hasher.BYTE_ARRAY)
+                .makeOrGet();
+    }
+
+    @Override
+    public void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones) {
+        this.items.forEach((keyBytes, valueBytes) ->
+                              items.put(serializer.decode(keyBytes),
+                                               serializer.decode(valueBytes)));
+
+        this.tombstones.forEach((keyBytes, valueBytes) ->
+                                   tombstones.put(serializer.decode(keyBytes),
+                                                    serializer.decode(valueBytes)));
+    }
+
+    @Override
+    public void put(K key, V value, Timestamp timestamp) {
+        executor.submit(() -> putInternal(key, value, timestamp));
+    }
+
+    private void putInternal(K key, V value, Timestamp timestamp) {
+        byte[] keyBytes = serializer.encode(key);
+        byte[] removedBytes = tombstones.get(keyBytes);
+
+        Timestamp removed = removedBytes == null ? null :
+                            serializer.decode(removedBytes);
+        if (removed != null && removed.isNewerThan(timestamp)) {
+            return;
+        }
+
+        final MutableBoolean updated = new MutableBoolean(false);
+
+        items.compute(keyBytes, (k, existingBytes) -> {
+            Timestamped<V> existing = existingBytes == null ? null :
+                                      serializer.decode(existingBytes);
+            if (existing != null && existing.isNewerThan(timestamp)) {
+                updated.setFalse();
+                return existingBytes;
+            } else {
+                updated.setTrue();
+                return serializer.encode(new Timestamped<>(value, timestamp));
+            }
+        });
+
+        boolean success = updated.booleanValue();
+
+        if (success && removed != null) {
+            tombstones.remove(keyBytes, removedBytes);
+        }
+
+        database.commit();
+    }
+
+    @Override
+    public void remove(K key, Timestamp timestamp) {
+        executor.submit(() -> removeInternal(key, timestamp));
+    }
+
+    private void removeInternal(K key, Timestamp timestamp) {
+        byte[] keyBytes = serializer.encode(key);
+
+        final MutableBoolean updated = new MutableBoolean(false);
+
+        items.compute(keyBytes, (k, existingBytes) -> {
+            Timestamp existing = existingBytes == null ? null :
+                                 serializer.decode(existingBytes);
+            if (existing != null && existing.isNewerThan(timestamp)) {
+                updated.setFalse();
+                return existingBytes;
+            } else {
+                updated.setTrue();
+                // remove from items map
+                return null;
+            }
+        });
+
+        if (!updated.booleanValue()) {
+            return;
+        }
+
+        byte[] timestampBytes = serializer.encode(timestamp);
+        byte[] removedBytes = tombstones.get(keyBytes);
+
+        Timestamp removedTimestamp = removedBytes == null ? null :
+                                     serializer.decode(removedBytes);
+        if (removedTimestamp == null) {
+            tombstones.putIfAbsent(keyBytes, timestampBytes);
+        } else if (timestamp.isNewerThan(removedTimestamp)) {
+            tombstones.replace(keyBytes, removedBytes, timestampBytes);
+        }
+
+        database.commit();
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
new file mode 100644
index 0000000..b945f93
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/PersistentStore.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.ecmap;
+
+import org.onosproject.store.Timestamp;
+import org.onosproject.store.impl.Timestamped;
+
+import java.util.Map;
+
+/**
+ * A persistent store for an eventually consistent map.
+ */
+interface PersistentStore<K, V> {
+
+    /**
+     * Read the contents of the disk into the given maps.
+     *
+     * @param items items map
+     * @param tombstones tombstones map
+     */
+    void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones);
+
+    /**
+     * Puts a new key,value pair into the map on disk.
+     *
+     * @param key the key
+     * @param value the value
+     * @param timestamp the timestamp of the update
+     */
+    void put(K key, V value, Timestamp timestamp);
+
+    /**
+     * Removes a key from the map on disk.
+     *
+     * @param key the key
+     * @param timestamp the timestamp of the update
+     */
+    void remove(K key, Timestamp timestamp);
+}
diff --git a/features/features.xml b/features/features.xml
index a345962..a132cf2 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -64,7 +64,7 @@
         <bundle>mvn:com.typesafe/config/1.2.1</bundle>
         <bundle>mvn:org.onosproject/onlab-thirdparty/@ONOS-VERSION</bundle>
 
-        <bundle>mvn:org.mapdb/mapdb/1.0.6</bundle>
+        <bundle>mvn:org.mapdb/mapdb/1.0.7</bundle>
     </feature>
 
     <feature name="onos-thirdparty-web" version="@FEATURE-VERSION"