CORD-414 Editing hash groups buckets in the OF-DPA driver instead of
creating new groups.
Also in this commit - fix for NPE in groups cli, and removal of unnecessary
cpqd-ofdpa3 driver.
Change-Id: I2a5dd183cb38ed901caa5a806791b77e9d92d93c
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
index a72eea1..4600eb8 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.driver.pipeline;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayDeque;
@@ -26,6 +28,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -131,6 +135,11 @@
protected Set<IPCriterion> sentIpFilters = Collections.newSetFromMap(
new ConcurrentHashMap<>());
+ // flows installations to be retried
+ protected ScheduledExecutorService executorService
+ = newScheduledThreadPool(5, groupedThreads("OfdpaPipeliner", "retry-%d", log));
+ protected static final int MAX_RETRY_ATTEMPTS = 10;
+ protected static final int RETRY_MS = 1000;
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
@@ -181,30 +190,33 @@
@Override
public void forward(ForwardingObjective fwd) {
- Collection<FlowRule> rules;
- FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
-
- rules = processForward(fwd);
+ Collection<FlowRule> rules = processForward(fwd);
if (rules == null || rules.isEmpty()) {
// Assumes fail message has already been generated to the objective
// context. Returning here prevents spurious pass message to be
// generated by FlowRule service for empty flowOps.
return;
}
+ sendForward(fwd, rules);
+ }
+
+ protected void sendForward(ForwardingObjective fwd, Collection<FlowRule> rules) {
+ FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
switch (fwd.op()) {
- case ADD:
- rules.stream()
- .filter(Objects::nonNull)
- .forEach(flowOpsBuilder::add);
- break;
- case REMOVE:
- rules.stream()
- .filter(Objects::nonNull)
- .forEach(flowOpsBuilder::remove);
- break;
- default:
- fail(fwd, ObjectiveError.UNKNOWN);
- log.warn("Unknown forwarding type {}", fwd.op());
+ case ADD:
+ rules.stream()
+ .filter(Objects::nonNull)
+ .forEach(flowOpsBuilder::add);
+ log.info("Applying a flow rule to sw:{}", deviceId);
+ break;
+ case REMOVE:
+ rules.stream()
+ .filter(Objects::nonNull)
+ .forEach(flowOpsBuilder::remove);
+ break;
+ default:
+ fail(fwd, ObjectiveError.UNKNOWN);
+ log.warn("Unknown forwarding type {}", fwd.op());
}
flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
@@ -802,24 +814,6 @@
}
}
- private boolean isSupportedEthTypeObjective(ForwardingObjective fwd) {
- TrafficSelector selector = fwd.selector();
- EthTypeCriterion ethType = (EthTypeCriterion) selector
- .getCriterion(Criterion.Type.ETH_TYPE);
- return !((ethType == null) ||
- ((ethType.ethType().toShort() != Ethernet.TYPE_IPV4) &&
- (ethType.ethType().toShort() != Ethernet.MPLS_UNICAST)));
- }
-
- private boolean isSupportedEthDstObjective(ForwardingObjective fwd) {
- TrafficSelector selector = fwd.selector();
- EthCriterion ethDst = (EthCriterion) selector
- .getCriterion(Criterion.Type.ETH_DST);
- VlanIdCriterion vlanId = (VlanIdCriterion) selector
- .getCriterion(Criterion.Type.VLAN_VID);
- return !(ethDst == null && vlanId == null);
- }
-
/**
* Handles forwarding rules to the IP and MPLS tables.
*
@@ -850,6 +844,7 @@
(EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
boolean defaultRule = false;
boolean popMpls = false;
+ boolean emptyGroup = false;
int forTableId;
TrafficSelector.Builder filteredSelector = DefaultTrafficSelector.builder();
TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
@@ -967,6 +962,12 @@
return Collections.emptySet();
}
tb.deferred().group(group.id());
+ // check if group is empty
+ if (gkeys.size() == 1 && gkeys.get(0).size() == 1) {
+ log.warn("Found empty group 0x{} in dev:{} .. will retry fwd:{}",
+ Integer.toHexString(group.id().id()), deviceId, fwd.id());
+ emptyGroup = true;
+ }
} else {
log.warn("Cannot find group for nextId:{} in dev:{}. Aborting fwd:{}",
fwd.nextId(), deviceId, fwd.id());
@@ -1016,7 +1017,11 @@
flowRuleCollection.add(rule.build());
log.debug("Default rule 0.0.0.0/0 is being installed two rules");
}
-
+ // XXX retrying flows may be necessary due to bug CORD-554
+ if (emptyGroup) {
+ executorService.schedule(new RetryFlows(fwd, flowRuleCollection),
+ RETRY_MS, TimeUnit.MILLISECONDS);
+ }
return flowRuleCollection;
}
@@ -1103,6 +1108,28 @@
return rules;
}
+ //////////////////////////////////////
+ // Helper Methods and Classes
+ //////////////////////////////////////
+
+ private boolean isSupportedEthTypeObjective(ForwardingObjective fwd) {
+ TrafficSelector selector = fwd.selector();
+ EthTypeCriterion ethType = (EthTypeCriterion) selector
+ .getCriterion(Criterion.Type.ETH_TYPE);
+ return !((ethType == null) ||
+ ((ethType.ethType().toShort() != Ethernet.TYPE_IPV4) &&
+ (ethType.ethType().toShort() != Ethernet.MPLS_UNICAST)));
+ }
+
+ private boolean isSupportedEthDstObjective(ForwardingObjective fwd) {
+ TrafficSelector selector = fwd.selector();
+ EthCriterion ethDst = (EthCriterion) selector
+ .getCriterion(Criterion.Type.ETH_DST);
+ VlanIdCriterion vlanId = (VlanIdCriterion) selector
+ .getCriterion(Criterion.Type.VLAN_VID);
+ return !(ethDst == null && vlanId == null);
+ }
+
protected NextGroup getGroupForNextObjective(Integer nextId) {
NextGroup next = flowObjectiveStore.getNextGroup(nextId);
if (next != null) {
@@ -1147,7 +1174,7 @@
}
// add port information for last group in group-chain
List<Instruction> lastGroupIns = new ArrayList<Instruction>();
- if (lastGroup != null) {
+ if (lastGroup != null && !lastGroup.buckets().buckets().isEmpty()) {
lastGroupIns = lastGroup.buckets().buckets().get(0)
.treatment().allInstructions();
}
@@ -1189,4 +1216,30 @@
}
return null;
}
+
+ /**
+ * Utility class that retries sending flows a fixed number of times, even if
+ * some of the attempts are successful. Used only for forwarding objectives.
+ */
+ protected final class RetryFlows implements Runnable {
+ int attempts = MAX_RETRY_ATTEMPTS;
+ private Collection<FlowRule> retryFlows;
+ private ForwardingObjective fwd;
+
+ RetryFlows(ForwardingObjective fwd, Collection<FlowRule> retryFlows) {
+ this.fwd = fwd;
+ this.retryFlows = retryFlows;
+ }
+
+ @Override
+ public void run() {
+ log.info("RETRY FLOWS ATTEMPT# {} for fwd:{} rules:{}",
+ MAX_RETRY_ATTEMPTS - attempts, fwd.id(), retryFlows.size());
+ sendForward(fwd, retryFlows);
+ if (--attempts > 0) {
+ executorService.schedule(this, RETRY_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
}