Refactor: use distribute storage to store precommit data

Change-Id: I07e3d03475e00cfbe2a37c8eb2f25b47f6751d3e
diff --git a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java
index a3aa0f4..e02ff35 100644
--- a/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java
+++ b/apps/openstacknetworking/app/src/main/java/org/onosproject/openstacknetworking/impl/PreCommitPortManager.java
@@ -15,20 +15,31 @@
  */
 package org.onosproject.openstacknetworking.impl;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent;
 import org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type;
 import org.onosproject.openstacknetworking.api.PreCommitPortService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+import static org.onosproject.openstacknetworking.api.Constants.OPENSTACK_NETWORKING_APP_ID;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -40,10 +51,30 @@
 
     protected final Logger log = getLogger(getClass());
 
-    private Map<String, Map<Type, Set<String>>> store = Maps.newConcurrentMap();
+    private static final KryoNamespace SERIALIZER_PRE_COMMIT = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(OpenstackNetworkEvent.Type.class)
+            .build();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private ConsistentMap<String, Map<Type, Set<String>>> store;
 
     @Activate
     protected void activate() {
+
+        ApplicationId appId = coreService.registerApplication(OPENSTACK_NETWORKING_APP_ID);
+
+        store = storageService.<String, Map<Type, Set<String>>>consistentMapBuilder()
+                .withSerializer(Serializer.using(SERIALIZER_PRE_COMMIT))
+                .withName("openstack-pre-commit-store")
+                .withApplicationId(appId)
+                .build();
+
         log.info("Started");
     }
 
@@ -54,7 +85,7 @@
 
     @Override
     public void subscribePreCommit(String portId, Type eventType, String className) {
-        store.computeIfAbsent(portId, s -> Maps.newConcurrentMap());
+        store.computeIfAbsent(portId, s -> new HashMap<>());
 
         store.compute(portId, (k, v) -> {
 
@@ -62,8 +93,7 @@
                 return null;
             }
 
-            Objects.requireNonNull(v).computeIfAbsent(eventType,
-                                     s -> Sets.newConcurrentHashSet());
+            Objects.requireNonNull(v).computeIfAbsent(eventType, s -> new HashSet<>());
 
 
             Objects.requireNonNull(v).compute(eventType, (i, j) -> {
@@ -96,7 +126,7 @@
     @Override
     public int subscriberCountByEventType(String portId, Type eventType) {
 
-        Map<Type, Set<String>> typeMap = store.get(portId);
+        Map<Type, Set<String>> typeMap = store.asJavaMap().get(portId);
 
         if (typeMap == null || typeMap.isEmpty()) {
             return 0;
@@ -112,7 +142,7 @@
     @Override
     public int subscriberCount(String portId) {
 
-        Map<Type, Set<String>> typeMap = store.get(portId);
+        Map<Type, Set<String>> typeMap = store.asJavaMap().get(portId);
 
         if (typeMap == null || typeMap.isEmpty()) {
             return 0;
diff --git a/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/PreCommitPortManagerTest.java b/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/PreCommitPortManagerTest.java
index b6ba3f9..3b32fc0 100644
--- a/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/PreCommitPortManagerTest.java
+++ b/apps/openstacknetworking/app/src/test/java/org/onosproject/openstacknetworking/impl/PreCommitPortManagerTest.java
@@ -18,6 +18,11 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.store.service.TestStorageService;
 
 import static org.junit.Assert.assertEquals;
 import static org.onosproject.openstacknetworking.api.OpenstackNetworkEvent.Type.OPENSTACK_PORT_PRE_REMOVE;
@@ -28,6 +33,8 @@
  */
 public class PreCommitPortManagerTest {
 
+    private static final ApplicationId TEST_APP_ID = new DefaultApplicationId(1, "test");
+
     private static final String PORT_ID_1 = "port-1";
     private static final String PORT_ID_2 = "port-2";
 
@@ -42,6 +49,8 @@
     @Before
     public void setUp() {
         target = new PreCommitPortManager();
+        TestUtils.setField(target, "coreService", new TestCoreService());
+        TestUtils.setField(target, "storageService", new TestStorageService());
         target.activate();
     }
 
@@ -51,6 +60,7 @@
     @After
     public void tearDown() {
         target.deactivate();
+        target = null;
     }
 
     /**
@@ -98,4 +108,15 @@
 
         target.subscribePreCommit(PORT_ID_2, OPENSTACK_PORT_PRE_UPDATE, CLASS_NAME_1);
     }
+
+    /**
+     * Mocks CoreService.
+     */
+    private static class TestCoreService extends CoreServiceAdapter {
+
+        @Override
+        public ApplicationId registerApplication(String name) {
+            return TEST_APP_ID;
+        }
+    }
 }