[SDFAB-359] Allow purging flows, groups and meters by device and application ID
Change-Id: I5e507d230789979ac997dbc99697fa0483363f70
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index c438885..a257cbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -33,19 +33,21 @@
import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -960,6 +962,51 @@
}
/**
+ * Purges the flows with the given application id.
+ *
+ * @param appId the application id
+ * @return a future to be completed once flow rules with given application
+ * id have been purged on all buckets
+ */
+ public CompletableFuture<Void> purge(ApplicationId appId) {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return Tools.exceptionalFuture(new IllegalStateException());
+ }
+
+ // If the master's term is not currently active (has not been synchronized
+ // with prior replicas), enqueue the changes to be executed once the master
+ // has been synchronized.
+ final long term = replicaInfo.term();
+ List<CompletableFuture<Void>> completablePurges = Lists.newArrayList();
+ if (activeTerm < term) {
+ log.debug("Enqueueing operations for device {}", deviceId);
+ flowBuckets.values().forEach(
+ bucket -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ completablePurges.add(future);
+ flowTasks.computeIfAbsent(bucket.bucketId().bucket(),
+ b -> new LinkedList<>())
+ .add(() -> future.complete(apply((bkt, trm) -> {
+ bkt.purge(appId, trm, clock);
+ return null;
+ }, bucket, term)));
+ });
+
+ } else {
+ flowBuckets.values().forEach(bucket -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ completablePurges.add(future);
+ future.complete(apply((bkt, trm) -> {
+ bkt.purge(appId, trm, clock);
+ return null;
+ }, bucket, term));
+ });
+ }
+ return CompletableFuture.allOf(completablePurges.toArray(new CompletableFuture[0]));
+ }
+
+ /**
* Closes the device flow table.
*/
public void close() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 0a96252..e4850d8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -44,6 +44,7 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerManager;
@@ -63,8 +64,8 @@
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
-import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -110,6 +111,7 @@
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.PURGE_FLOW_RULES;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
import static org.slf4j.LoggerFactory.getLogger;
@@ -137,6 +139,7 @@
private final Logger log = getLogger(getClass());
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+ private static final long PURGE_TIMEOUT_MILLIS = 30000;
private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
/** Number of threads in the message handler pool. */
@@ -340,6 +343,11 @@
serializer::encode, executor);
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+ clusterCommunicator.<Pair<DeviceId, ApplicationId>, Boolean>addSubscriber(
+ PURGE_FLOW_RULES,
+ serializer::decode,
+ p -> flowTable.purgeFlowRules(p.getLeft(), p.getRight()),
+ serializer::encode, executor);
}
private void unregisterMessageHandlers() {
@@ -634,6 +642,35 @@
}
@Override
+ public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (Objects.equals(local, master)) {
+ // bypass and handle it locally
+ return flowTable.purgeFlowRules(deviceId, appId);
+ }
+
+ if (master == null) {
+ log.warn("Failed to purgeFlowRules: No master for {}", deviceId);
+ return false;
+ }
+
+ log.trace("Forwarding purgeFlowRules to {}, which is the master for device {}",
+ master, deviceId);
+
+ return Tools.futureGetOrElse(
+ clusterCommunicator.sendAndReceive(
+ Pair.of(deviceId, appId),
+ PURGE_FLOW_RULES,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ false);
+ }
+
+ @Override
public void purgeFlowRules() {
flowTable.purgeFlowRules();
}
@@ -874,6 +911,33 @@
}
/**
+ * Purges flow rules for the given device and application id.
+ *
+ * @param deviceId the device for which to purge flow rules
+ * @param appId the application id for with to purge flow rules
+ * @return true if purge is successful, false otherwise
+ */
+ public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+ DeviceFlowTable flowTable = flowTables.get(deviceId);
+ if (flowTable != null) {
+ // flowTable.purge() returns a CompletableFuture<Void>, we want
+ // to return true when the completable future returns correctly
+ // within the timeout, otherwise return false.
+ try {
+ // Use higher timeout, purge(appId) may require more time
+ // than normal operations because it's applying the purge
+ // operation on every single flow table bucket.
+ flowTable.purge(appId).get(PURGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException | TimeoutException ignored) {
+ }
+ }
+ return false;
+ }
+
+ /**
* Purges all flow rules from the table.
*/
public void purgeFlowRules() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index e5a7631..80f376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -40,4 +40,7 @@
public static final MessageSubject FLOW_TABLE_BACKUP
= new MessageSubject("peer-flow-table-backup");
+
+ public static final MessageSubject PURGE_FLOW_RULES
+ = new MessageSubject("peer-purge-flow-rules");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index ea1887d..020559f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -21,6 +21,7 @@
import java.util.stream.Collectors;
import com.google.common.collect.Maps;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
@@ -271,6 +272,24 @@
}
/**
+ * Purge the entries with the given application ID.
+ *
+ * @param appId the application ID
+ * @param term the term in which the purge occurred
+ * @param clock the logical clock
+ */
+ public void purge(ApplicationId appId, long term, LogicalClock clock) {
+ boolean anythingRemoved = flowBucket.values().removeIf(flowEntryMap -> {
+ flowEntryMap.values().removeIf(storedFlowEntry -> storedFlowEntry.appId() == appId.id());
+ return flowEntryMap.isEmpty();
+ });
+ if (anythingRemoved) {
+ recordUpdate(term, clock.getTimestamp());
+ }
+ }
+
+
+ /**
* Clears the bucket.
*/
public void clear() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 8fd32ab..8337e84 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -23,6 +23,7 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.core.GroupId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
@@ -1093,6 +1094,18 @@
}
@Override
+ public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+ Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
+ new HashSet<>();
+
+ getGroupStoreKeyMap().entrySet().stream()
+ .filter(entry -> entry.getKey().deviceId().equals(deviceId) && entry.getValue().appId().equals(appId))
+ .forEach(entriesPendingRemove::add);
+
+ purgeGroupEntries(entriesPendingRemove);
+ }
+
+ @Override
public void purgeGroupEntries() {
purgeGroupEntries(getGroupStoreKeyMap().entrySet());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 38f8748..b2034dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -22,6 +22,7 @@
import com.google.common.collect.Maps;
import org.apache.commons.lang.math.RandomUtils;
import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.MeterQuery;
import org.onosproject.net.driver.DriverHandler;
@@ -440,17 +441,25 @@
@Override
public void purgeMeter(DeviceId deviceId) {
- // Purge api (typically used when the device is offline)
List<Versioned<MeterData>> metersPendingRemove = meters.stream()
.filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
- // Remove definitely the meter
metersPendingRemove.forEach(versionedMeterKey
-> purgeMeter(versionedMeterKey.value().meter()));
}
@Override
+ public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+ List<Versioned<MeterData>> metersPendingRemove = meters.stream()
+ .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
+ e.getValue().value().meter().appId().equals(appId))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+ }
+
+ @Override
public long getMaxMeters(MeterFeaturesKey key) {
// Leverage the meter features to know the max id
// Create a Meter Table key with FeaturesKey's device and global scope