Fix for [ONOS-5824]
Changes:
- Redesigns DefaultSingleTablePipeline;
- Changes the timeout of test to reduce the false-negatives;
Change-Id: I15fa20fd8a15908e70bc22de7913367b9ef113c7
diff --git a/cli/src/main/java/org/onosproject/cli/net/IntentPushTestCommand.java b/cli/src/main/java/org/onosproject/cli/net/IntentPushTestCommand.java
index 2782b76..d6405f0 100644
--- a/cli/src/main/java/org/onosproject/cli/net/IntentPushTestCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/IntentPushTestCommand.java
@@ -165,7 +165,9 @@
}
try {
- if (latch.await(1000 + count * 30, TimeUnit.MILLISECONDS)) {
+ // In this way with the tests in place the timeout will be
+ // 61 seconds.
+ if (latch.await(1000 + count * 60, TimeUnit.MILLISECONDS)) {
printResults(count);
} else {
print("Failure: %d intents not installed", latch.getCount());
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
index a004fde..426f36c 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
@@ -41,13 +41,14 @@
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.group.DefaultGroupKey;
-import org.onosproject.net.group.GroupKey;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static org.onosproject.net.flowobjective.Objective.Operation.ADD;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -62,15 +63,26 @@
private FlowObjectiveStore flowObjectiveStore;
private DeviceId deviceId;
- private Cache<Integer, NextObjective> pendingNext;
-
private KryoNamespace appKryo = new KryoNamespace.Builder()
- .register(GroupKey.class)
- .register(DefaultGroupKey.class)
+ .register(KryoNamespaces.API)
.register(SingleGroup.class)
- .register(byte[].class)
.build("DefaultSingleTablePipeline");
+ // Fast path for the installation. If we don't find the nextobjective in
+ // the cache, as fallback mechanism we will try to retrieve the treatments
+ // from the store. It is safe to use this cache for the addition, while it
+ // should be avoided for the removal. This cache from Guava does not offer
+ // thread-safe read (the read does not take the lock).
+ private Cache<Integer, NextObjective> pendingAddNext = CacheBuilder.newBuilder()
+ .expireAfterWrite(20, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ notification.getValue().context()
+ .ifPresent(c -> c.onError(notification.getValue(),
+ ObjectiveError.FLOWINSTALLATIONFAILED));
+ }
+ }).build();
+
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
@@ -80,15 +92,6 @@
flowRuleService = serviceDirectory.get(FlowRuleService.class);
flowObjectiveStore = serviceDirectory.get(FlowObjectiveStore.class);
- pendingNext = CacheBuilder.newBuilder()
- .expireAfterWrite(20, TimeUnit.SECONDS)
- .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
- if (notification.getCause() == RemovalCause.EXPIRED) {
- notification.getValue().context()
- .ifPresent(c -> c.onError(notification.getValue(),
- ObjectiveError.FLOWINSTALLATIONFAILED));
- }
- }).build();
}
@Override
@@ -140,7 +143,6 @@
@Override
public void forward(ForwardingObjective fwd) {
TrafficSelector selector = fwd.selector();
-
if (fwd.treatment() != null) {
// Deal with SPECIFIC and VERSATILE in the same manner.
FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
@@ -158,28 +160,56 @@
installObjective(ruleBuilder, fwd);
} else {
- NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
- if (nextObjective != null) {
- pendingNext.invalidate(fwd.nextId());
- nextObjective.next().forEach(treat -> {
- FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
- .forDevice(deviceId)
- .withSelector(selector)
- .fromApp(fwd.appId())
- .withPriority(fwd.priority())
- .withTreatment(treat);
-
- if (fwd.permanent()) {
- ruleBuilder.makePermanent();
- } else {
- ruleBuilder.makeTemporary(fwd.timeout());
+ NextObjective nextObjective;
+ NextGroup next;
+ TrafficTreatment treatment;
+ if (fwd.op() == ADD) {
+ // Give a try to the cache. Doing an operation
+ // on the store seems to be very expensive.
+ nextObjective = pendingAddNext.getIfPresent(fwd.nextId());
+ // If the next objective is not present
+ // We will try with the store
+ if (nextObjective == null) {
+ next = flowObjectiveStore.getNextGroup(fwd.nextId());
+ // We verify that next was in the store and then de-serialize
+ // the treatment in order to re-build the flow rule.
+ if (next == null) {
+ fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
+ return;
}
- installObjective(ruleBuilder, fwd);
- });
+ treatment = appKryo.deserialize(next.data());
+ } else {
+ pendingAddNext.invalidate(fwd.nextId());
+ treatment = nextObjective.next().iterator().next();
+ }
} else {
- fwd.context().ifPresent(c -> c.onError(fwd,
- ObjectiveError.GROUPMISSING));
+ // We get the NextGroup from the remove operation.
+ // Doing an operation on the store seems to be very expensive.
+ next = flowObjectiveStore.removeNextGroup(fwd.nextId());
+ if (next == null) {
+ fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
+ return;
+ }
+ treatment = appKryo.deserialize(next.data());
}
+ // If the treatment is null we cannot re-build the original flow
+ if (treatment == null) {
+ fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
+ return;
+ }
+ // Finally we build the flow rule and push to the flowrule subsystem.
+ FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withSelector(selector)
+ .fromApp(fwd.appId())
+ .withPriority(fwd.priority())
+ .withTreatment(treatment);
+ if (fwd.permanent()) {
+ ruleBuilder.makePermanent();
+ } else {
+ ruleBuilder.makeTemporary(fwd.timeout());
+ }
+ installObjective(ruleBuilder, fwd);
}
}
@@ -213,34 +243,45 @@
@Override
public void next(NextObjective nextObjective) {
-
- pendingNext.put(nextObjective.id(), nextObjective);
- flowObjectiveStore.putNextGroup(nextObjective.id(),
- new SingleGroup(new DefaultGroupKey(appKryo.serialize(nextObjective.id()))));
+ switch (nextObjective.op()) {
+ case ADD:
+ // We insert the value in the cache
+ pendingAddNext.put(nextObjective.id(), nextObjective);
+ // Then in the store, this will unblock the queued fwd obj
+ flowObjectiveStore.putNextGroup(
+ nextObjective.id(),
+ new SingleGroup(nextObjective.next().iterator().next())
+ );
+ break;
+ case REMOVE:
+ break;
+ default:
+ log.warn("Unsupported operation {}", nextObjective.op());
+ }
nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
}
@Override
public List<String> getNextMappings(NextGroup nextGroup) {
// Default single table pipeline does not use nextObjectives or groups
- return null;
+ return Collections.emptyList();
}
private class SingleGroup implements NextGroup {
- private final GroupKey key;
+ private TrafficTreatment nextActions;
- public SingleGroup(GroupKey key) {
- this.key = key;
- }
-
- public GroupKey key() {
- return key;
+ SingleGroup(TrafficTreatment next) {
+ this.nextActions = next;
}
@Override
public byte[] data() {
- return appKryo.serialize(key);
+ return appKryo.serialize(nextActions);
+ }
+
+ public TrafficTreatment treatment() {
+ return nextActions;
}
}