Modifying eventually consistent map and tests to make use of the persistence service.

Change-Id: I44ffcabb9d765a1c70c2790366c6d7381416dac6
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index f1e0dbd..b5ea52e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -28,6 +28,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -37,6 +38,7 @@
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.WallClockTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,6 +83,7 @@
     private final ClusterCommunicationService clusterCommunicator;
     private final KryoSerializer serializer;
     private final NodeId localNodeId;
+    private final PersistenceService persistenceService;
 
     private final BiFunction<K, V, Timestamp> timestampProvider;
 
@@ -116,7 +119,9 @@
     private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
 
     private final boolean persistent;
-    private final PersistentStore<K, V> persistentStore;
+
+    private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
+
 
     /**
      * Creates a new eventually consistent map shared amongst multiple instances.
@@ -158,9 +163,32 @@
                                 long antiEntropyPeriod,
                                 TimeUnit antiEntropyTimeUnit,
                                 boolean convergeFaster,
-                                boolean persistent) {
+                                boolean persistent,
+                                PersistenceService persistenceService) {
         this.mapName = mapName;
-        items = Maps.newConcurrentMap();
+        this.serializer = createSerializer(serializerBuilder);
+        this.persistenceService = persistenceService;
+        this.persistent =
+                persistent;
+        if (persistent) {
+            items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
+                    .withName(PERSISTENT_LOCAL_MAP_NAME)
+                    .withSerializer(new Serializer() {
+
+                        @Override
+                        public <T> byte[] encode(T object) {
+                            return EventuallyConsistentMapImpl.this.serializer.encode(object);
+                        }
+
+                        @Override
+                        public <T> T decode(byte[] bytes) {
+                            return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
+                        }
+                    })
+                    .build();
+        } else {
+            items = Maps.newConcurrentMap();
+        }
         senderPending = Maps.newConcurrentMap();
         destroyedMessage = mapName + ERROR_DESTROYED;
 
@@ -168,8 +196,6 @@
         this.clusterCommunicator = clusterCommunicator;
         this.localNodeId = clusterService.getLocalNode().id();
 
-        this.serializer = createSerializer(serializerBuilder);
-
         this.timestampProvider = timestampProvider;
 
         if (peerUpdateFunction != null) {
@@ -198,20 +224,6 @@
                     newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
         }
 
-        this.persistent = persistent;
-
-        if (this.persistent) {
-            String dataDirectory = System.getProperty("karaf.data", "./data");
-            String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
-
-            ExecutorService dbExecutor =
-                    newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
-
-            persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
-            persistentStore.readInto(items);
-        } else {
-            this.persistentStore = null;
-        }
 
         if (backgroundExecutor != null) {
             this.backgroundExecutor = backgroundExecutor;
@@ -373,15 +385,6 @@
                 return existing;
             }
         });
-        if (updated.get()) {
-            if (persistent) {
-                if (tombstone.isPresent()) {
-                    persistentStore.update(key, tombstone.get());
-                } else {
-                    persistentStore.remove(key);
-                }
-            }
-        }
         return previousValue.get();
     }
 
@@ -455,6 +458,7 @@
 
     /**
      * Returns true if newValue was accepted i.e. map is updated.
+     *
      * @param key key
      * @param newValue proposed new value
      * @return true if update happened; false if map already contains a more recent value for the key
@@ -473,9 +477,6 @@
             }
             return existing;
         });
-        if (updated.get() && persistent) {
-            persistentStore.update(key, newValue);
-        }
         return updated.get();
     }