Add persistence option to ECMap

Work towards ONOS-1337

Change-Id: I24e6a42e2f8856b363e79786829c51344797b81b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index 99eef3d..33e4251 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -93,7 +93,6 @@
             = new CopyOnWriteArraySet<>();
 
     private final ExecutorService executor;
-
     private final ScheduledExecutorService backgroundExecutor;
     private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
 
@@ -116,6 +115,9 @@
     private static final int LOAD_WINDOW = 2;
     private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
 
+    private final boolean persistent;
+    private final PersistentStore<K, V> persistentStore;
+
     /**
      * Creates a new eventually consistent map shared amongst multiple instances.
      * <p>
@@ -140,9 +142,9 @@
      * @param tombstonesDisabled    true if this map should not maintain
      *                              tombstones
      * @param antiEntropyPeriod     period that the anti-entropy task should run
-     *                              in seconds
-     * @param antiEntropyTimeUnit   time unit for anti-entropy task scheduling
+     * @param antiEntropyTimeUnit   time unit for anti-entropy period
      * @param convergeFaster        make anti-entropy try to converge faster
+     * @param persistent            persist data to disk
      */
     EventuallyConsistentMapImpl(String mapName,
                                 ClusterService clusterService,
@@ -156,7 +158,8 @@
                                 boolean tombstonesDisabled,
                                 long antiEntropyPeriod,
                                 TimeUnit antiEntropyTimeUnit,
-                                boolean convergeFaster) {
+                                boolean convergeFaster,
+                                boolean persistent) {
         items = new ConcurrentHashMap<>();
         removedItems = new ConcurrentHashMap<>();
         senderPending = Maps.newConcurrentMap();
@@ -195,6 +198,21 @@
                     newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
         }
 
+        this.persistent = persistent;
+
+        if (this.persistent) {
+            String dataDirectory = System.getProperty("karaf.data", "./data");
+            String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
+
+            ExecutorService dbExecutor =
+                    newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
+
+            persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
+            persistentStore.readInto(items, removedItems);
+        } else {
+            this.persistentStore = null;
+        }
+
         if (backgroundExecutor != null) {
             this.backgroundExecutor = backgroundExecutor;
         } else {
@@ -232,6 +250,7 @@
                         .register(ArrayList.class)
                         .register(AntiEntropyAdvertisement.class)
                         .register(HashMap.class)
+                        .register(Timestamped.class)
                         .build();
             }
         };
@@ -321,6 +340,11 @@
         if (success && removed != null) {
             removedItems.remove(key, removed);
         }
+
+        if (success && persistent) {
+            persistentStore.put(key, value, timestamp);
+        }
+
         return success;
     }
 
@@ -357,24 +381,29 @@
                 // remove from items map
                 return null;
             }
-            });
+        });
 
         if (updated.isFalse()) {
             return false;
         }
 
+        boolean updatedTombstone = false;
+
         if (!tombstonesDisabled) {
             Timestamp removedTimestamp = removedItems.get(key);
             if (removedTimestamp == null) {
-                return removedItems.putIfAbsent(key, timestamp) == null;
+                //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
+                updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
             } else if (timestamp.isNewerThan(removedTimestamp)) {
-                return removedItems.replace(key, removedTimestamp, timestamp);
-            } else {
-                return false;
+                updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
             }
         }
 
-        return updated.booleanValue();
+        if (updated.booleanValue() && persistent) {
+            persistentStore.remove(key, timestamp);
+        }
+
+        return (!tombstonesDisabled && updatedTombstone) || updated.booleanValue();
     }
 
     @Override