[ONOS-7892] Moving the ODTN SB cache to a distributed map.
Means to store the mapping between device config and ONOS core FR.

Change-Id: Idc5be6f3f40a142bdf5da79ce21c69ef5bfb12e7
(cherry picked from commit bb66e09d602003cf9b3febffd7828c0d25083923)
diff --git a/drivers/odtn-driver/BUILD b/drivers/odtn-driver/BUILD
index bb48507..fb1dbfc 100644
--- a/drivers/odtn-driver/BUILD
+++ b/drivers/odtn-driver/BUILD
@@ -1,5 +1,6 @@
 COMPILE_DEPS = CORE_DEPS + [
     "@commons_jxpath//jar",
+    "//core/store/serializers:onos-core-serializers",
     "//drivers/utilities:onos-drivers-utilities",
     "//protocols/netconf/api:onos-protocols-netconf-api",
     "//protocols/rest/api:onos-protocols-rest-api",
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/CassiniFlowRuleProgrammable.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/CassiniFlowRuleProgrammable.java
index bafa443..a17bdff 100644
--- a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/CassiniFlowRuleProgrammable.java
+++ b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/CassiniFlowRuleProgrammable.java
@@ -21,7 +21,7 @@
 import com.google.common.collect.ImmutableList;
 import org.onlab.util.Frequency;
 import org.onosproject.drivers.odtn.impl.FlowRuleParser;
-import org.onosproject.drivers.odtn.impl.OpenConfigConnectionCache;
+import org.onosproject.drivers.odtn.impl.DeviceConnectionCache;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
@@ -71,13 +71,13 @@
         List<FlowRule> added = new ArrayList<>();
         for (FlowRule r : rules) {
             try {
-                applyFlowRule(session, r);
+                String connectionId = applyFlowRule(session, r);
+                getConnectionCache().add(did(), connectionId, r);
+                added.add(r);
             } catch (Exception e) {
                 openConfigError("Error {}", e);
                 continue;
             }
-            getConnectionCache().add(did(), r);
-            added.add(r);
         }
         openConfigLog("applyFlowRules added {}", added.size());
         return added;
@@ -90,7 +90,7 @@
      */
     @Override
     public Collection<FlowEntry> getFlowEntries() {
-        OpenConfigConnectionCache cache = getConnectionCache();
+        DeviceConnectionCache cache = getConnectionCache();
         if (cache.get(did()) == null) {
             return ImmutableList.of();
         }
@@ -119,20 +119,20 @@
         List<FlowRule> removed = new ArrayList<>();
         for (FlowRule r : rules) {
             try {
-                removeFlowRule(session, r);
+                String connectionId = removeFlowRule(session, r);
+                getConnectionCache().remove(did(), connectionId);
+                removed.add(r);
             } catch (Exception e) {
                 openConfigError("Error {}", e);
                 continue;
             }
-            getConnectionCache().add(did(), r);
-            removed.add(r);
         }
         openConfigLog("removedFlowRules removed {}", removed.size());
         return removed;
     }
 
-    private OpenConfigConnectionCache getConnectionCache() {
-        return OpenConfigConnectionCache.init();
+    private DeviceConnectionCache getConnectionCache() {
+        return DeviceConnectionCache.init();
     }
 
     /**
@@ -221,23 +221,29 @@
      *
      * @param session The Netconf session.
      * @param r       Flow Rules to be applied.
+     * @return the optical channel + the frequency or just channel as identifier fo the config installed on the device
      * @throws NetconfException if exchange goes wrong
      */
-    protected void applyFlowRule(NetconfSession session, FlowRule r) throws NetconfException {
+    protected String applyFlowRule(NetconfSession session, FlowRule r) throws NetconfException {
         FlowRuleParser frp = new FlowRuleParser(r);
         if (!frp.isReceiver()) {
             String optChannel = getOpticalChannel(frp.getPortNumber());
-            setOpticalChannelFrequency(session, optChannel, frp.getCentralFrequency());
+            setOpticalChannelFrequency(session, optChannel,
+                    frp.getCentralFrequency());
+            return optChannel + ":" + frp.getCentralFrequency().asGHz();
         }
+        return String.valueOf(frp.getCentralFrequency().asGHz());
     }
 
 
-    protected void removeFlowRule(NetconfSession session, FlowRule r)
+    protected String removeFlowRule(NetconfSession session, FlowRule r)
             throws NetconfException {
         FlowRuleParser frp = new FlowRuleParser(r);
         if (!frp.isReceiver()) {
             String optChannel = getOpticalChannel(frp.getPortNumber());
             setOpticalChannelFrequency(session, optChannel, Frequency.ofMHz(0));
+            return optChannel + ":" + frp.getCentralFrequency().asGHz();
         }
+        return String.valueOf(frp.getCentralFrequency().asGHz());
     }
 }
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnection.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnection.java
new file mode 100644
index 0000000..290773f
--- /dev/null
+++ b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnection.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This work was partially supported by EC H2020 project METRO-HAUL (761727).
+ */
+package org.onosproject.drivers.odtn.impl;
+
+import org.onosproject.net.flow.FlowRule;
+
+import java.util.Objects;
+
+/**
+ * Connection consisting of a unique identifier of the connection on the device and the corresponding rule within ONOS.
+ */
+public final class DeviceConnection {
+
+    private String id;
+    private FlowRule fr;
+
+    //Avoiding public construction
+    private DeviceConnection(){}
+
+    /**
+     * Creates the Device connection object.
+     *
+     * @param id the unique id of the connection on the device
+     * @param fr the flow rule in ONOS
+     */
+    private DeviceConnection(String id, FlowRule fr) {
+        this.id = id;
+        this.fr = fr;
+    }
+
+    /**
+     * Creates the Device connection object.
+     *
+     * @param id the unique id of the connection on the device
+     * @param fr the flow rule in ONOS
+     * @return the DeviceConnection object.
+     */
+    public static DeviceConnection of(String id, FlowRule fr) {
+        return new DeviceConnection(id, fr);
+    }
+
+    /**
+     * Gets the unique id of the connection on the device.
+     * E.g TAPI UUID of the connectivity service.
+     *
+     * @return the unique id on the device.
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * Gets the flow rule associated to the unique id on the device.
+     * The Flow Rule contains the info of the given connection as needed by ONOS.
+     *
+     * @return the flow rule
+     */
+    public FlowRule getFlowRule() {
+        return fr;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DeviceConnection that = (DeviceConnection) o;
+        return Objects.equals(id, that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, fr);
+    }
+}
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnectionCache.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnectionCache.java
new file mode 100644
index 0000000..ba0ea76
--- /dev/null
+++ b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnectionCache.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This work was partially supported by EC H2020 project METRO-HAUL (761727).
+ */
+
+package org.onosproject.drivers.odtn.impl;
+
+import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Stores a set of Rules for given open config based devices in order to properly report them to the store.
+ */
+public final class DeviceConnectionCache {
+    private static final Logger log =
+            LoggerFactory.getLogger(DeviceConnectionCache.class);
+
+    private static final StorageService STORAGE_SERVICE = DefaultServiceDirectory.getService(StorageService.class);
+
+    private static final String MAP_NAME = "onos-odtn-flow-cache";
+
+    private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+            .register(DeviceConnection.class).build();
+
+    private EventuallyConsistentMap<DeviceId, Set<DeviceConnection>>
+            flowCache;
+
+    private static DeviceConnectionCache cache = null;
+    private static final Object CACHE_LOCK = new Object();
+
+    //banning public construction
+    private DeviceConnectionCache() {
+        flowCache = STORAGE_SERVICE
+                .<DeviceId, Set<DeviceConnection>>eventuallyConsistentMapBuilder()
+                .withName(MAP_NAME)
+                .withSerializer(SERIALIZER)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+    }
+
+    /**
+     * Initializes the cache if not already present.
+     * If present returns the existing one.
+     *
+     * @return single instance of cache
+     */
+    public static DeviceConnectionCache init() {
+        synchronized (CACHE_LOCK) {
+            if (cache == null) {
+                cache = new DeviceConnectionCache();
+            }
+        }
+        return cache;
+    }
+
+    /**
+     * Returns the number of rules stored for a given device.
+     *
+     * @param did the device
+     * @return number of flows stored
+     */
+    public int size(DeviceId did) {
+        if (!flowCache.containsKey(did)) {
+            return 0;
+        }
+        return flowCache.get(did).size();
+    }
+
+    /**
+     * Returns the flow with given Id for the specific device.
+     *
+     * @param did    device id
+     * @param flowId flow id
+     * @return the flow rule
+     */
+    public FlowRule get(DeviceId did, FlowId flowId) {
+        if (!flowCache.containsKey(did)) {
+            return null;
+        }
+        Set<DeviceConnection> set = flowCache.get(did);
+        DeviceConnection connection = set.stream()
+                .filter(c -> c.getFlowRule().id() == flowId)
+                .findFirst()
+                .orElse(null);
+        return connection != null ? connection.getFlowRule() : null;
+    }
+
+    /**
+     * Returns the flow with given Id for the specific device.
+     *
+     * @param did          device id
+     * @param connectionId the device specific connection id
+     * @return the flow rule
+     */
+    public FlowRule get(DeviceId did, String connectionId) {
+        if (!flowCache.containsKey(did)) {
+            return null;
+        }
+        Set<DeviceConnection> set = flowCache.get(did);
+        DeviceConnection connection = set.stream()
+                .filter(c -> c.getId().equals(connectionId))
+                .findFirst()
+                .orElse(null);
+        return connection != null ? connection.getFlowRule() : null;
+    }
+
+    /**
+     * Returns all the flows for the specific device.
+     *
+     * @param did device id
+     * @return Set of flow rules
+     */
+    public Set<FlowRule> get(DeviceId did) {
+        if (!flowCache.containsKey(did)) {
+            return null;
+        }
+        return flowCache.get(did).stream()
+                .map(DeviceConnection::getFlowRule)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Add to a specific device a flow and a device specific connection id for that flow.
+     *
+     * @param did          device id
+     * @param connectionId the device specific connection identifier
+     * @param flowRule     the flow rule
+     */
+    public void add(DeviceId did, String connectionId, FlowRule flowRule) {
+        Set<DeviceConnection> set;
+        if (flowCache.containsKey(did)) {
+            set = flowCache.get(did);
+        } else {
+            set = new HashSet<>();
+            log.debug("DeviceConnectionCache created for {}", did);
+            flowCache.put(did, set);
+        }
+        set.add(DeviceConnection.of(connectionId, flowRule));
+    }
+
+    /**
+     * Add a flows for the specific device.
+     *
+     * @param did      device id
+     * @param flowRule the flow rule
+     */
+    public void remove(DeviceId did, FlowRule flowRule) {
+        if (!flowCache.containsKey(did)) {
+            return;
+        }
+        Set<DeviceConnection> set = flowCache.get(did);
+        set.removeIf(r2 -> r2.getFlowRule().id() == flowRule.id());
+    }
+
+    /**
+     * Add a flows for the specific device.
+     *
+     * @param did          device id
+     * @param connectionId the connectionId as identified on the Device
+     */
+    public void remove(DeviceId did, String connectionId) {
+        if (!flowCache.containsKey(did)) {
+            return;
+        }
+        Set<DeviceConnection> set = flowCache.get(did);
+        set.removeIf(r2 -> r2.getId().equals(connectionId));
+    }
+}
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/OpenConfigConnectionCache.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/OpenConfigConnectionCache.java
deleted file mode 100644
index 95adebc..0000000
--- a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/OpenConfigConnectionCache.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * This work was partially supported by EC H2020 project METRO-HAUL (761727).
- */
-
-package org.onosproject.drivers.odtn.impl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.FlowId;
-import org.onosproject.net.flow.FlowRule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Stores a set of Rules for given open config based devices in order to properly report them to the store.
- */
-public final class OpenConfigConnectionCache {
-    private static final Logger log =
-            LoggerFactory.getLogger(OpenConfigConnectionCache.class);
-
-    private Map<DeviceId, Set<FlowRule>> mp = new HashMap<>();
-    private Map<DeviceId, Set<FlowRule>> smp = Collections.synchronizedMap(mp);
-
-    private static OpenConfigConnectionCache cache = null;
-    private static final Object CACHE_LOCK = new Object();
-
-    //banning public construction
-    private OpenConfigConnectionCache() {
-    }
-
-    /**
-     * Initializes the cache if not already present.
-     * If present returns the existing one.
-     *
-     * @return single instance of cache
-     */
-    public static OpenConfigConnectionCache init() {
-        synchronized (CACHE_LOCK) {
-            if (cache == null) {
-                cache = new OpenConfigConnectionCache();
-            }
-        }
-        return cache;
-    }
-
-    /**
-     * Returns the number of rules stored for a given device.
-     *
-     * @param did the device
-     * @return number of flows stored
-     */
-    public int size(DeviceId did) {
-        synchronized (smp) {
-            if (!smp.containsKey(did)) {
-                return 0;
-            }
-            return smp.get(did).size();
-        }
-    }
-
-    /**
-     * Returns the flow with given Id for the specific device.
-     *
-     * @param did    device id
-     * @param flowId flow id
-     * @return the flow rule
-     */
-    public FlowRule get(DeviceId did, FlowId flowId) {
-        synchronized (smp) {
-            if (!smp.containsKey(did)) {
-                return null;
-            }
-            Set<FlowRule> set = smp.get(did);
-            return set.stream()
-                    .filter(r -> r.id() == flowId)
-                    .findFirst()
-                    .orElse(null);
-        }
-    }
-
-    /**
-     * Returns all the flows for the specific device.
-     *
-     * @param did device id
-     * @return Set of flow rules
-     */
-    public Set<FlowRule> get(DeviceId did) {
-        synchronized (smp) {
-            if (!smp.containsKey(did)) {
-                return null;
-            }
-            return smp.get(did);
-        }
-    }
-
-    /**
-     * Add a flows for the specific device.
-     *
-     * @param did      device id
-     * @param flowRule the flow rule
-     */
-    public void add(DeviceId did, FlowRule flowRule) {
-        synchronized (smp) {
-            Set<FlowRule> set;
-            if (smp.containsKey(did)) {
-                set = smp.get(did);
-            } else {
-                set = new HashSet<FlowRule>();
-                log.warn("OpenConfigConnectionCache created for {}", did);
-                smp.put(did, set);
-            }
-            set.add(flowRule);
-        }
-    }
-
-    /**
-     * Add a flows for the specific device.
-     *
-     * @param did      device id
-     * @param flowRule the flow rule
-     */
-    public void remove(DeviceId did, FlowRule flowRule) {
-        synchronized (smp) {
-            if (!smp.containsKey(did)) {
-                return;
-            }
-            Set<FlowRule> set = smp.get(did);
-            set.removeIf(r2 -> r2.id() == flowRule.id());
-        }
-    }
-}
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/openconfig/TerminalDeviceFlowRuleProgrammable.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/openconfig/TerminalDeviceFlowRuleProgrammable.java
index d6f8aa2..0d0787e 100644
--- a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/openconfig/TerminalDeviceFlowRuleProgrammable.java
+++ b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/openconfig/TerminalDeviceFlowRuleProgrammable.java
@@ -20,8 +20,8 @@
 
 import com.google.common.collect.ImmutableList;
 import org.onlab.util.Frequency;
+import org.onosproject.drivers.odtn.impl.DeviceConnectionCache;
 import org.onosproject.drivers.odtn.impl.FlowRuleParser;
-import org.onosproject.drivers.odtn.impl.OpenConfigConnectionCache;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
@@ -83,13 +83,13 @@
         List<FlowRule> added = new ArrayList<>();
         for (FlowRule r : rules) {
             try {
-                applyFlowRule(session, r);
+                String connectionId = applyFlowRule(session, r);
+                getConnectionCache().add(did(), connectionId, r);
+                added.add(r);
             } catch (Exception e) {
                 openConfigError("Error {}", e);
                 continue;
             }
-            getConnectionCache().add(did(), r);
-            added.add(r);
         }
         openConfigLog("applyFlowRules added {}", added.size());
         return added;
@@ -102,7 +102,7 @@
      */
     @Override
     public Collection<FlowEntry> getFlowEntries() {
-        OpenConfigConnectionCache cache = getConnectionCache();
+        DeviceConnectionCache cache = getConnectionCache();
         if (cache.get(did()) == null) {
             return ImmutableList.of();
         }
@@ -131,20 +131,20 @@
         List<FlowRule> removed = new ArrayList<>();
         for (FlowRule r : rules) {
             try {
-                removeFlowRule(session, r);
+                String connectionId = removeFlowRule(session, r);
+                getConnectionCache().remove(did(), connectionId);
+                removed.add(r);
             } catch (Exception e) {
                 openConfigError("Error {}", e);
                 continue;
             }
-            getConnectionCache().add(did(), r);
-            removed.add(r);
         }
         openConfigLog("removedFlowRules removed {}", removed.size());
         return removed;
     }
 
-    private OpenConfigConnectionCache getConnectionCache() {
-        return OpenConfigConnectionCache.init();
+    private DeviceConnectionCache getConnectionCache() {
+        return DeviceConnectionCache.init();
     }
 
     // Context so XPath expressions are aware of XML namespaces
@@ -346,7 +346,7 @@
 
     /**
      * Apply the flowrule.
-     *
+     * <p>
      * Note: only bidirectional are supported as of now,
      * given OpenConfig note (below). In consequence, only the
      * TX rules are actually mapped to netconf ops.
@@ -362,24 +362,29 @@
      *
      * @param session The Netconf session.
      * @param r       Flow Rules to be applied.
+     * @return the optical channel + the frequency or just channel as identifier fo the config installed on the device
      * @throws NetconfException if exchange goes wrong
      */
-    protected void applyFlowRule(NetconfSession session, FlowRule r) throws NetconfException {
+    protected String applyFlowRule(NetconfSession session, FlowRule r) throws NetconfException {
         FlowRuleParser frp = new FlowRuleParser(r);
         if (!frp.isReceiver()) {
             String optChannel = getOpticalChannel(session, frp.getPortNumber());
             setOpticalChannelFrequency(session, optChannel,
                     frp.getCentralFrequency());
+            return optChannel + ":" + frp.getCentralFrequency().asGHz();
         }
+        return String.valueOf(frp.getCentralFrequency().asGHz());
     }
 
 
-    protected void removeFlowRule(NetconfSession session, FlowRule r)
+    protected String removeFlowRule(NetconfSession session, FlowRule r)
             throws NetconfException {
         FlowRuleParser frp = new FlowRuleParser(r);
         if (!frp.isReceiver()) {
             String optChannel = getOpticalChannel(session, frp.getPortNumber());
             setOpticalChannelFrequency(session, optChannel, Frequency.ofMHz(0));
+            return optChannel + ":" + frp.getCentralFrequency().asGHz();
         }
+        return String.valueOf(frp.getCentralFrequency().asGHz());
     }
 }