[ONOS-6556] Distributed Implementation of PiPipeconfService

Change-Id: I7196ce6eee333e732c0cd5015d4d8d32ee069e27
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 9072f3f..a0bf93e 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -44,6 +44,7 @@
 import org.onosproject.net.pi.model.PiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconfId;
 import org.onosproject.net.pi.runtime.PiPipeconfConfig;
+import org.onosproject.net.pi.runtime.PiPipeconfMappingStore;
 import org.onosproject.net.pi.runtime.PiPipeconfService;
 import org.slf4j.Logger;
 
@@ -83,10 +84,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverAdminService driverAdminService;
 
-    //TODO move to replicated map
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PiPipeconfMappingStore pipeconfMappingStore;
+
+    // Registered pipeconf are replicated through the app subsystem and registered on app activated events.
     protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
-    //TODO move to replicated map
-    protected ConcurrentHashMap<DeviceId, PiPipeconfId> devicesToPipeconf = new ConcurrentHashMap<>();
 
     protected ExecutorService executor =
             Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice",
@@ -120,7 +122,6 @@
         cfgService.removeListener(cfgListener);
         cfgService.unregisterConfigFactory(factory);
         piPipeconfs.clear();
-        devicesToPipeconf.clear();
         cfgService = null;
         driverAdminService = null;
         driverService = null;
@@ -139,11 +140,12 @@
 
     @Override
     public void remove(PiPipeconfId pipeconfId) throws IllegalStateException {
-        //TODO move to the distributed mechanism
         //TODO add mechanism to remove from device.
         if (!piPipeconfs.containsKey(pipeconfId)) {
             throw new IllegalStateException(format("Pipeconf %s is not registered", pipeconfId));
         }
+        // TODO remove the binding from the distributed Store when the lifecycle of a pipeconf is defined.
+        // pipeconfMappingStore.removeBindings(pipeconfId);
         piPipeconfs.remove(pipeconfId);
     }
 
@@ -212,7 +214,7 @@
                 // Completable future is needed for when this method will also apply the pipeline to the device.
                 // FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here
                 // the association between device and pipeconf.
-                devicesToPipeconf.put(deviceId, pipeconfId);
+                pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
                 operationResult.complete(true);
             }
         });
@@ -221,9 +223,10 @@
 
     @Override
     public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
-        return Optional.ofNullable(devicesToPipeconf.get(deviceId));
+        return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
     }
 
+
     private class PiPipeconfDriverProviderInternal implements DriverProvider {
 
         Driver driver;
@@ -245,7 +248,7 @@
         if (id.id().equals("")) {
             log.warn("Not adding empty pipeconfId for device {}", deviceId);
         } else {
-            devicesToPipeconf.put(deviceId, pipeconfConfig.piPipeconfId());
+            pipeconfMappingStore.createOrUpdateBinding(deviceId, id);
         }
     }