[ONOS-7892] Moving the ODTN SB cache to a distributed map.
Means to store the mapping between device config and ONOS core FR.
Change-Id: Idc5be6f3f40a142bdf5da79ce21c69ef5bfb12e7
(cherry picked from commit bb66e09d602003cf9b3febffd7828c0d25083923)
diff --git a/drivers/odtn-driver/BUILD b/drivers/odtn-driver/BUILD
index bb48507..fb1dbfc 100644
--- a/drivers/odtn-driver/BUILD
+++ b/drivers/odtn-driver/BUILD
@@ -1,5 +1,6 @@
COMPILE_DEPS = CORE_DEPS + [
"@commons_jxpath//jar",
+ "//core/store/serializers:onos-core-serializers",
"//drivers/utilities:onos-drivers-utilities",
"//protocols/netconf/api:onos-protocols-netconf-api",
"//protocols/rest/api:onos-protocols-rest-api",
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/CassiniFlowRuleProgrammable.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/CassiniFlowRuleProgrammable.java
index bafa443..a17bdff 100644
--- a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/CassiniFlowRuleProgrammable.java
+++ b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/CassiniFlowRuleProgrammable.java
@@ -21,7 +21,7 @@
import com.google.common.collect.ImmutableList;
import org.onlab.util.Frequency;
import org.onosproject.drivers.odtn.impl.FlowRuleParser;
-import org.onosproject.drivers.odtn.impl.OpenConfigConnectionCache;
+import org.onosproject.drivers.odtn.impl.DeviceConnectionCache;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
@@ -71,13 +71,13 @@
List<FlowRule> added = new ArrayList<>();
for (FlowRule r : rules) {
try {
- applyFlowRule(session, r);
+ String connectionId = applyFlowRule(session, r);
+ getConnectionCache().add(did(), connectionId, r);
+ added.add(r);
} catch (Exception e) {
openConfigError("Error {}", e);
continue;
}
- getConnectionCache().add(did(), r);
- added.add(r);
}
openConfigLog("applyFlowRules added {}", added.size());
return added;
@@ -90,7 +90,7 @@
*/
@Override
public Collection<FlowEntry> getFlowEntries() {
- OpenConfigConnectionCache cache = getConnectionCache();
+ DeviceConnectionCache cache = getConnectionCache();
if (cache.get(did()) == null) {
return ImmutableList.of();
}
@@ -119,20 +119,20 @@
List<FlowRule> removed = new ArrayList<>();
for (FlowRule r : rules) {
try {
- removeFlowRule(session, r);
+ String connectionId = removeFlowRule(session, r);
+ getConnectionCache().remove(did(), connectionId);
+ removed.add(r);
} catch (Exception e) {
openConfigError("Error {}", e);
continue;
}
- getConnectionCache().add(did(), r);
- removed.add(r);
}
openConfigLog("removedFlowRules removed {}", removed.size());
return removed;
}
- private OpenConfigConnectionCache getConnectionCache() {
- return OpenConfigConnectionCache.init();
+ private DeviceConnectionCache getConnectionCache() {
+ return DeviceConnectionCache.init();
}
/**
@@ -221,23 +221,29 @@
*
* @param session The Netconf session.
* @param r Flow Rules to be applied.
+ * @return the optical channel + the frequency or just channel as identifier fo the config installed on the device
* @throws NetconfException if exchange goes wrong
*/
- protected void applyFlowRule(NetconfSession session, FlowRule r) throws NetconfException {
+ protected String applyFlowRule(NetconfSession session, FlowRule r) throws NetconfException {
FlowRuleParser frp = new FlowRuleParser(r);
if (!frp.isReceiver()) {
String optChannel = getOpticalChannel(frp.getPortNumber());
- setOpticalChannelFrequency(session, optChannel, frp.getCentralFrequency());
+ setOpticalChannelFrequency(session, optChannel,
+ frp.getCentralFrequency());
+ return optChannel + ":" + frp.getCentralFrequency().asGHz();
}
+ return String.valueOf(frp.getCentralFrequency().asGHz());
}
- protected void removeFlowRule(NetconfSession session, FlowRule r)
+ protected String removeFlowRule(NetconfSession session, FlowRule r)
throws NetconfException {
FlowRuleParser frp = new FlowRuleParser(r);
if (!frp.isReceiver()) {
String optChannel = getOpticalChannel(frp.getPortNumber());
setOpticalChannelFrequency(session, optChannel, Frequency.ofMHz(0));
+ return optChannel + ":" + frp.getCentralFrequency().asGHz();
}
+ return String.valueOf(frp.getCentralFrequency().asGHz());
}
}
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnection.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnection.java
new file mode 100644
index 0000000..290773f
--- /dev/null
+++ b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnection.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This work was partially supported by EC H2020 project METRO-HAUL (761727).
+ */
+package org.onosproject.drivers.odtn.impl;
+
+import org.onosproject.net.flow.FlowRule;
+
+import java.util.Objects;
+
+/**
+ * Connection consisting of a unique identifier of the connection on the device and the corresponding rule within ONOS.
+ */
+public final class DeviceConnection {
+
+ private String id;
+ private FlowRule fr;
+
+ //Avoiding public construction
+ private DeviceConnection(){}
+
+ /**
+ * Creates the Device connection object.
+ *
+ * @param id the unique id of the connection on the device
+ * @param fr the flow rule in ONOS
+ */
+ private DeviceConnection(String id, FlowRule fr) {
+ this.id = id;
+ this.fr = fr;
+ }
+
+ /**
+ * Creates the Device connection object.
+ *
+ * @param id the unique id of the connection on the device
+ * @param fr the flow rule in ONOS
+ * @return the DeviceConnection object.
+ */
+ public static DeviceConnection of(String id, FlowRule fr) {
+ return new DeviceConnection(id, fr);
+ }
+
+ /**
+ * Gets the unique id of the connection on the device.
+ * E.g TAPI UUID of the connectivity service.
+ *
+ * @return the unique id on the device.
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Gets the flow rule associated to the unique id on the device.
+ * The Flow Rule contains the info of the given connection as needed by ONOS.
+ *
+ * @return the flow rule
+ */
+ public FlowRule getFlowRule() {
+ return fr;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DeviceConnection that = (DeviceConnection) o;
+ return Objects.equals(id, that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, fr);
+ }
+}
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnectionCache.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnectionCache.java
new file mode 100644
index 0000000..ba0ea76
--- /dev/null
+++ b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/DeviceConnectionCache.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This work was partially supported by EC H2020 project METRO-HAUL (761727).
+ */
+
+package org.onosproject.drivers.odtn.impl;
+
+import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Stores a set of Rules for given open config based devices in order to properly report them to the store.
+ */
+public final class DeviceConnectionCache {
+ private static final Logger log =
+ LoggerFactory.getLogger(DeviceConnectionCache.class);
+
+ private static final StorageService STORAGE_SERVICE = DefaultServiceDirectory.getService(StorageService.class);
+
+ private static final String MAP_NAME = "onos-odtn-flow-cache";
+
+ private static final KryoNamespace SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ .register(DeviceConnection.class).build();
+
+ private EventuallyConsistentMap<DeviceId, Set<DeviceConnection>>
+ flowCache;
+
+ private static DeviceConnectionCache cache = null;
+ private static final Object CACHE_LOCK = new Object();
+
+ //banning public construction
+ private DeviceConnectionCache() {
+ flowCache = STORAGE_SERVICE
+ .<DeviceId, Set<DeviceConnection>>eventuallyConsistentMapBuilder()
+ .withName(MAP_NAME)
+ .withSerializer(SERIALIZER)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .build();
+ }
+
+ /**
+ * Initializes the cache if not already present.
+ * If present returns the existing one.
+ *
+ * @return single instance of cache
+ */
+ public static DeviceConnectionCache init() {
+ synchronized (CACHE_LOCK) {
+ if (cache == null) {
+ cache = new DeviceConnectionCache();
+ }
+ }
+ return cache;
+ }
+
+ /**
+ * Returns the number of rules stored for a given device.
+ *
+ * @param did the device
+ * @return number of flows stored
+ */
+ public int size(DeviceId did) {
+ if (!flowCache.containsKey(did)) {
+ return 0;
+ }
+ return flowCache.get(did).size();
+ }
+
+ /**
+ * Returns the flow with given Id for the specific device.
+ *
+ * @param did device id
+ * @param flowId flow id
+ * @return the flow rule
+ */
+ public FlowRule get(DeviceId did, FlowId flowId) {
+ if (!flowCache.containsKey(did)) {
+ return null;
+ }
+ Set<DeviceConnection> set = flowCache.get(did);
+ DeviceConnection connection = set.stream()
+ .filter(c -> c.getFlowRule().id() == flowId)
+ .findFirst()
+ .orElse(null);
+ return connection != null ? connection.getFlowRule() : null;
+ }
+
+ /**
+ * Returns the flow with given Id for the specific device.
+ *
+ * @param did device id
+ * @param connectionId the device specific connection id
+ * @return the flow rule
+ */
+ public FlowRule get(DeviceId did, String connectionId) {
+ if (!flowCache.containsKey(did)) {
+ return null;
+ }
+ Set<DeviceConnection> set = flowCache.get(did);
+ DeviceConnection connection = set.stream()
+ .filter(c -> c.getId().equals(connectionId))
+ .findFirst()
+ .orElse(null);
+ return connection != null ? connection.getFlowRule() : null;
+ }
+
+ /**
+ * Returns all the flows for the specific device.
+ *
+ * @param did device id
+ * @return Set of flow rules
+ */
+ public Set<FlowRule> get(DeviceId did) {
+ if (!flowCache.containsKey(did)) {
+ return null;
+ }
+ return flowCache.get(did).stream()
+ .map(DeviceConnection::getFlowRule)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Add to a specific device a flow and a device specific connection id for that flow.
+ *
+ * @param did device id
+ * @param connectionId the device specific connection identifier
+ * @param flowRule the flow rule
+ */
+ public void add(DeviceId did, String connectionId, FlowRule flowRule) {
+ Set<DeviceConnection> set;
+ if (flowCache.containsKey(did)) {
+ set = flowCache.get(did);
+ } else {
+ set = new HashSet<>();
+ log.debug("DeviceConnectionCache created for {}", did);
+ flowCache.put(did, set);
+ }
+ set.add(DeviceConnection.of(connectionId, flowRule));
+ }
+
+ /**
+ * Add a flows for the specific device.
+ *
+ * @param did device id
+ * @param flowRule the flow rule
+ */
+ public void remove(DeviceId did, FlowRule flowRule) {
+ if (!flowCache.containsKey(did)) {
+ return;
+ }
+ Set<DeviceConnection> set = flowCache.get(did);
+ set.removeIf(r2 -> r2.getFlowRule().id() == flowRule.id());
+ }
+
+ /**
+ * Add a flows for the specific device.
+ *
+ * @param did device id
+ * @param connectionId the connectionId as identified on the Device
+ */
+ public void remove(DeviceId did, String connectionId) {
+ if (!flowCache.containsKey(did)) {
+ return;
+ }
+ Set<DeviceConnection> set = flowCache.get(did);
+ set.removeIf(r2 -> r2.getId().equals(connectionId));
+ }
+}
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/OpenConfigConnectionCache.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/OpenConfigConnectionCache.java
deleted file mode 100644
index 95adebc..0000000
--- a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/impl/OpenConfigConnectionCache.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * This work was partially supported by EC H2020 project METRO-HAUL (761727).
- */
-
-package org.onosproject.drivers.odtn.impl;
-
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.FlowId;
-import org.onosproject.net.flow.FlowRule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Stores a set of Rules for given open config based devices in order to properly report them to the store.
- */
-public final class OpenConfigConnectionCache {
- private static final Logger log =
- LoggerFactory.getLogger(OpenConfigConnectionCache.class);
-
- private Map<DeviceId, Set<FlowRule>> mp = new HashMap<>();
- private Map<DeviceId, Set<FlowRule>> smp = Collections.synchronizedMap(mp);
-
- private static OpenConfigConnectionCache cache = null;
- private static final Object CACHE_LOCK = new Object();
-
- //banning public construction
- private OpenConfigConnectionCache() {
- }
-
- /**
- * Initializes the cache if not already present.
- * If present returns the existing one.
- *
- * @return single instance of cache
- */
- public static OpenConfigConnectionCache init() {
- synchronized (CACHE_LOCK) {
- if (cache == null) {
- cache = new OpenConfigConnectionCache();
- }
- }
- return cache;
- }
-
- /**
- * Returns the number of rules stored for a given device.
- *
- * @param did the device
- * @return number of flows stored
- */
- public int size(DeviceId did) {
- synchronized (smp) {
- if (!smp.containsKey(did)) {
- return 0;
- }
- return smp.get(did).size();
- }
- }
-
- /**
- * Returns the flow with given Id for the specific device.
- *
- * @param did device id
- * @param flowId flow id
- * @return the flow rule
- */
- public FlowRule get(DeviceId did, FlowId flowId) {
- synchronized (smp) {
- if (!smp.containsKey(did)) {
- return null;
- }
- Set<FlowRule> set = smp.get(did);
- return set.stream()
- .filter(r -> r.id() == flowId)
- .findFirst()
- .orElse(null);
- }
- }
-
- /**
- * Returns all the flows for the specific device.
- *
- * @param did device id
- * @return Set of flow rules
- */
- public Set<FlowRule> get(DeviceId did) {
- synchronized (smp) {
- if (!smp.containsKey(did)) {
- return null;
- }
- return smp.get(did);
- }
- }
-
- /**
- * Add a flows for the specific device.
- *
- * @param did device id
- * @param flowRule the flow rule
- */
- public void add(DeviceId did, FlowRule flowRule) {
- synchronized (smp) {
- Set<FlowRule> set;
- if (smp.containsKey(did)) {
- set = smp.get(did);
- } else {
- set = new HashSet<FlowRule>();
- log.warn("OpenConfigConnectionCache created for {}", did);
- smp.put(did, set);
- }
- set.add(flowRule);
- }
- }
-
- /**
- * Add a flows for the specific device.
- *
- * @param did device id
- * @param flowRule the flow rule
- */
- public void remove(DeviceId did, FlowRule flowRule) {
- synchronized (smp) {
- if (!smp.containsKey(did)) {
- return;
- }
- Set<FlowRule> set = smp.get(did);
- set.removeIf(r2 -> r2.id() == flowRule.id());
- }
- }
-}
diff --git a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/openconfig/TerminalDeviceFlowRuleProgrammable.java b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/openconfig/TerminalDeviceFlowRuleProgrammable.java
index d6f8aa2..0d0787e 100644
--- a/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/openconfig/TerminalDeviceFlowRuleProgrammable.java
+++ b/drivers/odtn-driver/src/main/java/org/onosproject/drivers/odtn/openconfig/TerminalDeviceFlowRuleProgrammable.java
@@ -20,8 +20,8 @@
import com.google.common.collect.ImmutableList;
import org.onlab.util.Frequency;
+import org.onosproject.drivers.odtn.impl.DeviceConnectionCache;
import org.onosproject.drivers.odtn.impl.FlowRuleParser;
-import org.onosproject.drivers.odtn.impl.OpenConfigConnectionCache;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
@@ -83,13 +83,13 @@
List<FlowRule> added = new ArrayList<>();
for (FlowRule r : rules) {
try {
- applyFlowRule(session, r);
+ String connectionId = applyFlowRule(session, r);
+ getConnectionCache().add(did(), connectionId, r);
+ added.add(r);
} catch (Exception e) {
openConfigError("Error {}", e);
continue;
}
- getConnectionCache().add(did(), r);
- added.add(r);
}
openConfigLog("applyFlowRules added {}", added.size());
return added;
@@ -102,7 +102,7 @@
*/
@Override
public Collection<FlowEntry> getFlowEntries() {
- OpenConfigConnectionCache cache = getConnectionCache();
+ DeviceConnectionCache cache = getConnectionCache();
if (cache.get(did()) == null) {
return ImmutableList.of();
}
@@ -131,20 +131,20 @@
List<FlowRule> removed = new ArrayList<>();
for (FlowRule r : rules) {
try {
- removeFlowRule(session, r);
+ String connectionId = removeFlowRule(session, r);
+ getConnectionCache().remove(did(), connectionId);
+ removed.add(r);
} catch (Exception e) {
openConfigError("Error {}", e);
continue;
}
- getConnectionCache().add(did(), r);
- removed.add(r);
}
openConfigLog("removedFlowRules removed {}", removed.size());
return removed;
}
- private OpenConfigConnectionCache getConnectionCache() {
- return OpenConfigConnectionCache.init();
+ private DeviceConnectionCache getConnectionCache() {
+ return DeviceConnectionCache.init();
}
// Context so XPath expressions are aware of XML namespaces
@@ -346,7 +346,7 @@
/**
* Apply the flowrule.
- *
+ * <p>
* Note: only bidirectional are supported as of now,
* given OpenConfig note (below). In consequence, only the
* TX rules are actually mapped to netconf ops.
@@ -362,24 +362,29 @@
*
* @param session The Netconf session.
* @param r Flow Rules to be applied.
+ * @return the optical channel + the frequency or just channel as identifier fo the config installed on the device
* @throws NetconfException if exchange goes wrong
*/
- protected void applyFlowRule(NetconfSession session, FlowRule r) throws NetconfException {
+ protected String applyFlowRule(NetconfSession session, FlowRule r) throws NetconfException {
FlowRuleParser frp = new FlowRuleParser(r);
if (!frp.isReceiver()) {
String optChannel = getOpticalChannel(session, frp.getPortNumber());
setOpticalChannelFrequency(session, optChannel,
frp.getCentralFrequency());
+ return optChannel + ":" + frp.getCentralFrequency().asGHz();
}
+ return String.valueOf(frp.getCentralFrequency().asGHz());
}
- protected void removeFlowRule(NetconfSession session, FlowRule r)
+ protected String removeFlowRule(NetconfSession session, FlowRule r)
throws NetconfException {
FlowRuleParser frp = new FlowRuleParser(r);
if (!frp.isReceiver()) {
String optChannel = getOpticalChannel(session, frp.getPortNumber());
setOpticalChannelFrequency(session, optChannel, Frequency.ofMHz(0));
+ return optChannel + ":" + frp.getCentralFrequency().asGHz();
}
+ return String.valueOf(frp.getCentralFrequency().asGHz());
}
}