Modifying eventually consistent map and tests to make use of the persistence service.

Change-Id: I44ffcabb9d765a1c70c2790366c6d7381416dac6
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java b/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java
index 4f612de..02462e8 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java
@@ -30,7 +30,8 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.Timestamp;
 
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.*;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
 
 /**
  * Testing version of an Eventually Consistent Map.
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index f2ec2a7..0b8b72b 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -69,6 +69,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-persistence</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.mapdb</groupId>
             <artifactId>mapdb</artifactId>
             <version>1.0.8</version>
@@ -110,5 +116,4 @@
             <artifactId>onlab-thirdparty</artifactId>
         </dependency>
     </dependencies>
-
 </project>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 3e89635..90d81ee 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -55,6 +55,7 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.IdGenerator;
+import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
 import org.onosproject.store.service.AtomicCounterBuilder;
@@ -128,6 +129,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PersistenceService persistenceService;
+
     protected String nodeIdToUri(NodeId nodeId) {
         ControllerNode node = clusterService.getNode(nodeId);
         return String.format("onos://%s:%d", node.ip(), node.tcpPort());
@@ -312,7 +316,8 @@
     @Override
     public <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder() {
         return new EventuallyConsistentMapBuilderImpl<>(clusterService,
-                                                        clusterCommunicator);
+                                                        clusterCommunicator,
+                                                        persistenceService);
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
index a553fff..eb98c82 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -18,6 +18,7 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.service.EventuallyConsistentMap;
@@ -52,6 +53,8 @@
     private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
     private boolean convergeFaster = false;
     private boolean persistent = false;
+    private boolean persistentMap = false;
+    private final PersistenceService persistenceService;
 
     /**
      * Creates a new eventually consistent map builder.
@@ -60,7 +63,9 @@
      * @param clusterCommunicator cluster communication service
      */
     public EventuallyConsistentMapBuilderImpl(ClusterService clusterService,
-                                              ClusterCommunicationService clusterCommunicator) {
+                                              ClusterCommunicationService clusterCommunicator,
+                                              PersistenceService persistenceService) {
+        this.persistenceService = persistenceService;
         this.clusterService = checkNotNull(clusterService);
         this.clusterCommunicator = checkNotNull(clusterCommunicator);
     }
@@ -133,6 +138,7 @@
 
     @Override
     public EventuallyConsistentMapBuilder<K, V> withPersistence() {
+        checkNotNull(this.persistenceService);
         persistent = true;
         return this;
     }
@@ -156,6 +162,7 @@
                                                  antiEntropyPeriod,
                                                  antiEntropyTimeUnit,
                                                  convergeFaster,
-                                                 persistent);
+                                                 persistent,
+                                                 persistenceService);
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index f1e0dbd..b5ea52e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -28,6 +28,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.persistence.PersistenceService;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -37,6 +38,7 @@
 import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.WallClockTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,6 +83,7 @@
     private final ClusterCommunicationService clusterCommunicator;
     private final KryoSerializer serializer;
     private final NodeId localNodeId;
+    private final PersistenceService persistenceService;
 
     private final BiFunction<K, V, Timestamp> timestampProvider;
 
@@ -116,7 +119,9 @@
     private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
 
     private final boolean persistent;
-    private final PersistentStore<K, V> persistentStore;
+
+    private static final String PERSISTENT_LOCAL_MAP_NAME = "itemsMap";
+
 
     /**
      * Creates a new eventually consistent map shared amongst multiple instances.
@@ -158,9 +163,32 @@
                                 long antiEntropyPeriod,
                                 TimeUnit antiEntropyTimeUnit,
                                 boolean convergeFaster,
-                                boolean persistent) {
+                                boolean persistent,
+                                PersistenceService persistenceService) {
         this.mapName = mapName;
-        items = Maps.newConcurrentMap();
+        this.serializer = createSerializer(serializerBuilder);
+        this.persistenceService = persistenceService;
+        this.persistent =
+                persistent;
+        if (persistent) {
+            items = this.persistenceService.<K, MapValue<V>>persistentMapBuilder()
+                    .withName(PERSISTENT_LOCAL_MAP_NAME)
+                    .withSerializer(new Serializer() {
+
+                        @Override
+                        public <T> byte[] encode(T object) {
+                            return EventuallyConsistentMapImpl.this.serializer.encode(object);
+                        }
+
+                        @Override
+                        public <T> T decode(byte[] bytes) {
+                            return EventuallyConsistentMapImpl.this.serializer.decode(bytes);
+                        }
+                    })
+                    .build();
+        } else {
+            items = Maps.newConcurrentMap();
+        }
         senderPending = Maps.newConcurrentMap();
         destroyedMessage = mapName + ERROR_DESTROYED;
 
@@ -168,8 +196,6 @@
         this.clusterCommunicator = clusterCommunicator;
         this.localNodeId = clusterService.getLocalNode().id();
 
-        this.serializer = createSerializer(serializerBuilder);
-
         this.timestampProvider = timestampProvider;
 
         if (peerUpdateFunction != null) {
@@ -198,20 +224,6 @@
                     newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
         }
 
-        this.persistent = persistent;
-
-        if (this.persistent) {
-            String dataDirectory = System.getProperty("karaf.data", "./data");
-            String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
-
-            ExecutorService dbExecutor =
-                    newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
-
-            persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
-            persistentStore.readInto(items);
-        } else {
-            this.persistentStore = null;
-        }
 
         if (backgroundExecutor != null) {
             this.backgroundExecutor = backgroundExecutor;
@@ -373,15 +385,6 @@
                 return existing;
             }
         });
-        if (updated.get()) {
-            if (persistent) {
-                if (tombstone.isPresent()) {
-                    persistentStore.update(key, tombstone.get());
-                } else {
-                    persistentStore.remove(key);
-                }
-            }
-        }
         return previousValue.get();
     }
 
@@ -455,6 +458,7 @@
 
     /**
      * Returns true if newValue was accepted i.e. map is updated.
+     *
      * @param key key
      * @param newValue proposed new value
      * @return true if update happened; false if map already contains a more recent value for the key
@@ -473,9 +477,6 @@
             }
             return existing;
         });
-        if (updated.get() && persistent) {
-            persistentStore.update(key, newValue);
-        }
         return updated.get();
     }
 
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index ccf6ee7..ef8d992 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -42,6 +42,7 @@
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.AbstractEvent;
+import org.onosproject.persistence.impl.PersistenceManager;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
@@ -81,6 +82,7 @@
 
     private EventuallyConsistentMap<String, String> ecMap;
 
+    private PersistenceManager persistenceService;
     private ClusterService clusterService;
     private ClusterCommunicationService clusterCommunicator;
     private SequentialClockService<String, String> clockService;
@@ -136,6 +138,8 @@
 
         clusterCommunicator = createMock(ClusterCommunicationService.class);
 
+        persistenceService = new PersistenceManager();
+        persistenceService.activate();
         // Add expectation for adding cluster message subscribers which
         // delegate to our ClusterCommunicationService implementation. This
         // allows us to get a reference to the map's internal cluster message
@@ -153,11 +157,12 @@
                 .register(TestTimestamp.class);
 
         ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
-                        clusterService, clusterCommunicator)
+                        clusterService, clusterCommunicator, persistenceService)
                 .withName(MAP_NAME)
                 .withSerializer(serializer)
                 .withTimestampProvider((k, v) -> clockService.getTimestamp(k, v))
                 .withCommunicationExecutor(MoreExecutors.newDirectExecutorService())
+                .withPersistence()
                 .build();
 
         // Reset ready for tests to add their own expectations