[ONOS-6016] Enhance IntentInstaller for FlowObjective
Note:
Provide order of objective context (for remove only)
Handling objective errors in intent installer
Change-Id: I50bb9d7a17a0ae71d22ba035cd5bc80f485ec45a
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
index bd51902..e00f2b8 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/ObjectiveError.java
@@ -66,5 +66,20 @@
/**
* An unknown error occurred.
*/
- UNKNOWN
+ UNKNOWN,
+
+ /**
+ * Flow/Group installation retry threshold exceeded.
+ */
+ INSTALLATIONTHRESHOLDEXCEEDED,
+
+ /**
+ * Installation timeout.
+ */
+ INSTALLATIONTIMEOUT,
+
+ /**
+ * Group already exists.
+ */
+ GROUPEXISTS
}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 44fc986..37bf9d2 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -265,7 +265,8 @@
private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
if (fwd.nextId() == null ||
- flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
+ flowObjectiveStore.getNextGroup(fwd.nextId()) != null ||
+ fwd.op() == Objective.Operation.REMOVE) {
// fast path
return false;
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
index 0fd57a9..538aa0e 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
@@ -32,7 +32,10 @@
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
@@ -53,12 +56,14 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.net.flowobjective.ObjectiveError.INSTALLATIONTHRESHOLDEXCEEDED;
import static org.onosproject.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
@@ -68,6 +73,7 @@
class IntentInstaller {
private static final Logger log = getLogger(IntentInstaller.class);
+ private static final long OBJECTIVE_RETRY_THRESHOLD = 5;
private IntentStore store;
private ObjectiveTrackerService trackerService;
@@ -516,8 +522,13 @@
// Context for applying and tracking operations related to flow objective intents.
private class FlowObjectiveOperationContext extends OperationContext {
- List<FlowObjectiveInstallationContext> contexts = Lists.newLinkedList();
- final Set<ObjectiveContext> pendingContexts = Sets.newHashSet();
+ private static final String UNSUPPORT_OBJ = "unsupported objective {}";
+ final List<ObjectiveContext> contexts = Lists.newArrayList();
+
+ final Set<ObjectiveContext> pendingContexts = Sets.newConcurrentHashSet();
+
+ // Second stage of pending contexts
+ final Set<ObjectiveContext> nextPendingContexts = Sets.newConcurrentHashSet();
final Set<ObjectiveContext> errorContexts = Sets.newConcurrentHashSet();
FlowObjectiveOperationContext(IntentInstallationContext context) {
@@ -526,46 +537,127 @@
@Override
public void prepareIntents(List<Intent> intentsToApply, Direction direction) {
- intentsToApply.stream()
- .filter(x -> x instanceof FlowObjectiveIntent)
- .flatMap(x -> buildObjectiveContexts((FlowObjectiveIntent) x, direction).stream())
- .forEach(contexts::add);
+ intentsToApply
+ .stream()
+ .filter(intent -> intent instanceof FlowObjectiveIntent)
+ .map(intent -> buildObjectiveContexts((FlowObjectiveIntent) intent, direction))
+ .flatMap(Collection::stream)
+ .forEach(contexts::add);
+
+ // Two stage for different direction context
+ // We will apply REMOVE context first, and apply ADD context.
+ contexts.forEach(context -> {
+ switch (direction) {
+ case REMOVE:
+ pendingContexts.add(context);
+ break;
+ case ADD:
+ nextPendingContexts.add(context);
+ break;
+ default:
+ break;
+ }
+ });
}
// Builds the specified objective in the appropriate direction
- private List<FlowObjectiveInstallationContext> buildObjectiveContexts(FlowObjectiveIntent intent,
- Direction direction) {
+ private Set<? extends ObjectiveContext> buildObjectiveContexts(FlowObjectiveIntent intent,
+ Direction direction) {
+ Set<FlowObjectiveInstallationContext> contexts = Sets.newHashSet();
int size = intent.objectives().size();
- List<FlowObjectiveInstallationContext> contexts = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- DeviceId deviceId = intent.devices().get(i);
- Objective.Builder builder = intent.objectives().get(i).copy();
- FlowObjectiveInstallationContext context = new FlowObjectiveInstallationContext();
+ List<Objective> objectives = intent.objectives();
+ List<DeviceId> deviceIds = intent.devices();
- final Objective objective;
- switch (direction) {
- case ADD:
- objective = builder.add(context);
- break;
- case REMOVE:
- objective = builder.remove(context);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported direction " + direction);
+ if (direction == Direction.ADD) {
+ for (int i = 0; i < size; i++) {
+ Objective objective = objectives.get(i);
+ DeviceId deviceId = deviceIds.get(i);
+ FlowObjectiveInstallationContext ctx =
+ buildObjectiveContext(objective, deviceId, direction);
+ contexts.add(ctx);
}
- context.setObjective(objective, deviceId);
- contexts.add(context);
+ return contexts;
+ } else {
+ // we need to care about ordering here
+ // basic idea is to chain objective contexts
+ for (int i = 0; i < size; i++) {
+ Objective objective = intent.objectives().get(i);
+ DeviceId deviceId = intent.devices().get(i);
+
+ if (objective instanceof FilteringObjective) {
+ // don't need to care ordering of filtering objective
+ FlowObjectiveInstallationContext ctx =
+ buildObjectiveContext(objective, deviceId, direction);
+ contexts.add(ctx);
+ } else if (objective instanceof NextObjective) {
+ // need to removed after forwarding objective
+ // nothing to do here
+ } else if (objective instanceof ForwardingObjective) {
+ // forwarding objective, also find next objective if
+ // exist
+ FlowObjectiveInstallationContext fwdCtx =
+ buildObjectiveContext(objective, deviceId, direction);
+ ForwardingObjective fwd = (ForwardingObjective) objective;
+ NextObjective nxt = null;
+ Integer nextId = fwd.nextId();
+ if (nextId != null) {
+ for (int j = 0; j < size; j++) {
+ if (objectives.get(j).id() == nextId) {
+ nxt = (NextObjective) objectives.get(j);
+ break;
+ }
+ }
+ // if a next objective exists in the Intent
+ if (nxt != null) {
+ FlowObjectiveInstallationContext nxtCtx =
+ buildObjectiveContext(nxt, deviceId, direction);
+ fwdCtx.nextContext(nxtCtx);
+ }
+ }
+ contexts.add(fwdCtx);
+ } else {
+ // possible here?
+ log.warn(UNSUPPORT_OBJ, objective);
+ }
+ }
}
return contexts;
}
+ private FlowObjectiveInstallationContext buildObjectiveContext(Objective objective,
+ DeviceId deviceId,
+ Direction direction) {
+ Objective.Builder builder = objective.copy();
+ FlowObjectiveInstallationContext ctx = new FlowObjectiveInstallationContext();
+ switch (direction) {
+ case ADD:
+ objective = builder.add(ctx);
+ break;
+ case REMOVE:
+ objective = builder.remove(ctx);
+ break;
+ default:
+ break;
+ }
+ ctx.setObjective(objective, deviceId);
+ return ctx;
+ }
+
@Override
void apply() {
- pendingContexts.addAll(contexts);
- contexts.forEach(objectiveContext ->
- flowObjectiveService.apply(objectiveContext.deviceId,
- objectiveContext.objective)
- );
+ // If there is no pending contexts, try apply second stage
+ // pending contexts
+ if (pendingContexts.isEmpty()) {
+ pendingContexts.addAll(nextPendingContexts);
+ nextPendingContexts.clear();
+ }
+ final Set<ObjectiveContext> contextsToApply = Sets.newHashSet(pendingContexts);
+ contextsToApply.forEach(ctx -> {
+ FlowObjectiveInstallationContext foiCtx =
+ (FlowObjectiveInstallationContext) ctx;
+
+ flowObjectiveService.apply(foiCtx.deviceId, foiCtx.objective);
+ });
}
@Override
@@ -581,46 +673,156 @@
.add("errorContexts", errorContexts);
}
-
private class FlowObjectiveInstallationContext implements ObjectiveContext {
Objective objective;
DeviceId deviceId;
ObjectiveError error;
+ AtomicInteger retry;
+ FlowObjectiveInstallationContext nextContext;
void setObjective(Objective objective, DeviceId deviceId) {
+ // init function
this.objective = objective;
this.deviceId = deviceId;
+ this.error = null;
+ this.retry = new AtomicInteger(0);
+ this.nextContext = null;
}
- @Override
- public void onSuccess(Objective objective) {
- finish();
+ int retryTimes() {
+ return this.retry.get();
}
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- this.error = error;
- errorContexts.add(this);
- finish();
+ void increaseRetryValue() {
+ this.retry.incrementAndGet();
}
- private void finish() {
+ private void finished(ObjectiveError error) {
+
synchronized (pendingContexts) {
- pendingContexts.remove(this);
- if (pendingContexts.isEmpty()) {
- if (errorContexts.isEmpty()) {
- successConsumer.accept(FlowObjectiveOperationContext.this);
+ if (error != null) {
+ this.error = error;
+ handleObjectiveError(this, error);
+ } else {
+ // apply next context if exist
+ if (nextContext != null) {
+ pendingContexts.add(nextContext);
+ flowObjectiveService.apply(nextContext.deviceId,
+ nextContext.objective);
+ pendingContexts.remove(this);
} else {
- errorConsumer.accept(FlowObjectiveOperationContext.this);
+ pendingContexts.remove(this);
}
}
+ if (!pendingContexts.isEmpty()) {
+ return;
+ }
+ // Apply second stage pending contexts if it is not empty
+ if (!nextPendingContexts.isEmpty()) {
+ pendingContexts.addAll(nextPendingContexts);
+ nextPendingContexts.clear();
+ final Set<ObjectiveContext> contextsToApply =
+ Sets.newHashSet(pendingContexts);
+ contextsToApply.forEach(ctx -> {
+ FlowObjectiveInstallationContext foiCtx =
+ (FlowObjectiveInstallationContext) ctx;
+ flowObjectiveService.apply(foiCtx.deviceId,
+ foiCtx.objective);
+ });
+ return;
+ }
+ if (errorContexts.isEmpty()) {
+ successConsumer.accept(FlowObjectiveOperationContext.this);
+ } else {
+ errorConsumer.accept(FlowObjectiveOperationContext.this);
+ }
}
}
@Override
+ public void onSuccess(Objective objective) {
+ finished(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ finished(error);
+ }
+
+ @Override
public String toString() {
return String.format("(%s on %s for %s)", error, deviceId, objective);
}
+
+ public void nextContext(FlowObjectiveInstallationContext nextContext) {
+ this.nextContext = nextContext;
+ }
+ }
+
+ private void handleObjectiveError(FlowObjectiveInstallationContext ctx,
+ ObjectiveError error) {
+ log.debug("Got error(s) when install objective: {}, error: {}, retry: {}",
+ ctx.objective, ctx.error, ctx.retry);
+ if (ctx.retryTimes() > OBJECTIVE_RETRY_THRESHOLD) {
+ ctx.error = INSTALLATIONTHRESHOLDEXCEEDED;
+ errorContexts.add(ctx);
+ return;
+ }
+ // reset error
+ ctx.error = null;
+ // strategies for errors
+ switch (error) {
+ case GROUPEXISTS:
+ if (ctx.objective.op() == Objective.Operation.ADD) {
+ // Next group exists
+ // build new objective with new op ADD_TO_EXIST
+ NextObjective newObj =
+ ((NextObjective.Builder) ctx.objective.copy()).addToExisting(ctx);
+ ctx.setObjective(newObj, ctx.deviceId);
+ ctx.increaseRetryValue();
+ flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+ } else {
+ pendingContexts.remove(ctx);
+ errorContexts.add(ctx);
+ }
+ break;
+ case GROUPINSTALLATIONFAILED:
+ // Group install failed, retry again
+ ctx.increaseRetryValue();
+ flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+ break;
+ case GROUPMISSING:
+ if (ctx.objective.op() == Objective.Operation.ADD_TO_EXISTING) {
+ // Next group not exist, but we want to add new buckets
+ // build new objective with new op ADD
+ NextObjective newObj = (NextObjective) ctx.objective.copy().add(ctx);
+ ctx.setObjective(newObj, ctx.deviceId);
+ ctx.increaseRetryValue();
+ flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+ } else if (ctx.objective.op() == Objective.Operation.REMOVE ||
+ ctx.objective.op() == Objective.Operation.REMOVE_FROM_EXISTING) {
+ // Already removed, no need to do anything
+ ctx.error = null;
+ pendingContexts.remove(ctx);
+ return;
+ } else {
+ // Next chaining group missing, try again.
+ ctx.increaseRetryValue();
+ flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+ }
+ break;
+ case FLOWINSTALLATIONFAILED:
+ case GROUPREMOVALFAILED:
+ case INSTALLATIONTIMEOUT:
+ // Retry
+ ctx.increaseRetryValue();
+ flowObjectiveService.apply(ctx.deviceId, ctx.objective);
+ break;
+ default:
+ pendingContexts.remove(ctx);
+ errorContexts.add(ctx);
+ break;
+ }
}
}