[ONOS-7892] Moving the ODTN SB cache to a distributed map.
Means to store the mapping between device config and ONOS core FR.

Change-Id: Idc5be6f3f40a142bdf5da79ce21c69ef5bfb12e7
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnectionCache.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnectionCache.java
new file mode 100644
index 0000000..ba0ea76
--- /dev/null
+++ b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnectionCache.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This work was partially supported by EC H2020 project METRO-HAUL (761727).
+ */
+
+package org.onosproject.drivers.odtn.impl;
+
+import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Stores a set of Rules for given open config based devices in order to properly report them to the store.
+ */
+public final class DeviceConnectionCache {
+    private static final Logger log =
+            LoggerFactory.getLogger(DeviceConnectionCache.class);
+
+    private static final StorageService STORAGE_SERVICE = DefaultServiceDirectory.getService(StorageService.class);
+
+    private static final String MAP_NAME = "onos-odtn-flow-cache";
+
+    private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+            .register(DeviceConnection.class).build();
+
+    private EventuallyConsistentMap<DeviceId, Set<DeviceConnection>>
+            flowCache;
+
+    private static DeviceConnectionCache cache = null;
+    private static final Object CACHE_LOCK = new Object();
+
+    //banning public construction
+    private DeviceConnectionCache() {
+        flowCache = STORAGE_SERVICE
+                .<DeviceId, Set<DeviceConnection>>eventuallyConsistentMapBuilder()
+                .withName(MAP_NAME)
+                .withSerializer(SERIALIZER)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+    }
+
+    /**
+     * Initializes the cache if not already present.
+     * If present returns the existing one.
+     *
+     * @return single instance of cache
+     */
+    public static DeviceConnectionCache init() {
+        synchronized (CACHE_LOCK) {
+            if (cache == null) {
+                cache = new DeviceConnectionCache();
+            }
+        }
+        return cache;
+    }
+
+    /**
+     * Returns the number of rules stored for a given device.
+     *
+     * @param did the device
+     * @return number of flows stored
+     */
+    public int size(DeviceId did) {
+        if (!flowCache.containsKey(did)) {
+            return 0;
+        }
+        return flowCache.get(did).size();
+    }
+
+    /**
+     * Returns the flow with given Id for the specific device.
+     *
+     * @param did    device id
+     * @param flowId flow id
+     * @return the flow rule
+     */
+    public FlowRule get(DeviceId did, FlowId flowId) {
+        if (!flowCache.containsKey(did)) {
+            return null;
+        }
+        Set<DeviceConnection> set = flowCache.get(did);
+        DeviceConnection connection = set.stream()
+                .filter(c -> c.getFlowRule().id() == flowId)
+                .findFirst()
+                .orElse(null);
+        return connection != null ? connection.getFlowRule() : null;
+    }
+
+    /**
+     * Returns the flow with given Id for the specific device.
+     *
+     * @param did          device id
+     * @param connectionId the device specific connection id
+     * @return the flow rule
+     */
+    public FlowRule get(DeviceId did, String connectionId) {
+        if (!flowCache.containsKey(did)) {
+            return null;
+        }
+        Set<DeviceConnection> set = flowCache.get(did);
+        DeviceConnection connection = set.stream()
+                .filter(c -> c.getId().equals(connectionId))
+                .findFirst()
+                .orElse(null);
+        return connection != null ? connection.getFlowRule() : null;
+    }
+
+    /**
+     * Returns all the flows for the specific device.
+     *
+     * @param did device id
+     * @return Set of flow rules
+     */
+    public Set<FlowRule> get(DeviceId did) {
+        if (!flowCache.containsKey(did)) {
+            return null;
+        }
+        return flowCache.get(did).stream()
+                .map(DeviceConnection::getFlowRule)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Add to a specific device a flow and a device specific connection id for that flow.
+     *
+     * @param did          device id
+     * @param connectionId the device specific connection identifier
+     * @param flowRule     the flow rule
+     */
+    public void add(DeviceId did, String connectionId, FlowRule flowRule) {
+        Set<DeviceConnection> set;
+        if (flowCache.containsKey(did)) {
+            set = flowCache.get(did);
+        } else {
+            set = new HashSet<>();
+            log.debug("DeviceConnectionCache created for {}", did);
+            flowCache.put(did, set);
+        }
+        set.add(DeviceConnection.of(connectionId, flowRule));
+    }
+
+    /**
+     * Add a flows for the specific device.
+     *
+     * @param did      device id
+     * @param flowRule the flow rule
+     */
+    public void remove(DeviceId did, FlowRule flowRule) {
+        if (!flowCache.containsKey(did)) {
+            return;
+        }
+        Set<DeviceConnection> set = flowCache.get(did);
+        set.removeIf(r2 -> r2.getFlowRule().id() == flowRule.id());
+    }
+
+    /**
+     * Add a flows for the specific device.
+     *
+     * @param did          device id
+     * @param connectionId the connectionId as identified on the Device
+     */
+    public void remove(DeviceId did, String connectionId) {
+        if (!flowCache.containsKey(did)) {
+            return;
+        }
+        Set<DeviceConnection> set = flowCache.get(did);
+        set.removeIf(r2 -> r2.getId().equals(connectionId));
+    }
+}