CORD-1180 Collection of fixes for hash-group buckets. Required the following changes:
Next-objectives that edited groups are now queued in the FlowObjectiveManager instead of the driver.
During linkup immediately checking for previous portups that should be added to a hash group.
A final retry 30 secs later to catch all ports that should be part of the same hash group.
Change-Id: I7ef450149d685890ca47932b8e559a0c11dc5ab4
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 47b63ed..f51ee7c 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -48,6 +48,7 @@
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.Objective.Operation;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.flowobjective.ObjectiveEvent.Type;
@@ -126,7 +127,14 @@
protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
- private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
+ // local stores for queuing fwd and next objectives that are waiting for an
+ // associated next objective execution to complete. The signal for completed
+ // execution comes from a pipeline driver, in this or another controller
+ // instance, via the DistributedFlowObjectiveStore.
+ private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
+ Maps.newConcurrentMap();
+ private final Map<Integer, Set<PendingFlowObjective>> pendingNexts =
+ Maps.newConcurrentMap();
// local store to track which nextObjectives were sent to which device
// for debugging purposes
@@ -202,6 +210,7 @@
if (pipeliner != null) {
if (objective instanceof NextObjective) {
+ nextToDevice.put(objective.id(), deviceId);
pipeliner.next((NextObjective) objective);
} else if (objective instanceof ForwardingObjective) {
pipeliner.forward((ForwardingObjective) objective);
@@ -218,7 +227,7 @@
objective.context().ifPresent(
c -> c.onError(objective, ObjectiveError.NOPIPELINER));
}
- //Excpetion thrown
+ //Exception thrown
} catch (Exception e) {
log.warn("Exception while installing flow objective", e);
}
@@ -234,7 +243,7 @@
@Override
public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
checkPermission(FLOWRULE_WRITE);
- if (queueObjective(deviceId, forwardingObjective)) {
+ if (queueFwdObjective(deviceId, forwardingObjective)) {
return;
}
executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
@@ -243,7 +252,9 @@
@Override
public void next(DeviceId deviceId, NextObjective nextObjective) {
checkPermission(FLOWRULE_WRITE);
- nextToDevice.put(nextObjective.id(), deviceId);
+ if (queueNextObjective(deviceId, nextObjective)) {
+ return;
+ }
executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
}
@@ -256,7 +267,7 @@
@Override
public void initPolicy(String policy) {}
- private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
+ private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
if (fwd.nextId() == null ||
flowObjectiveStore.getNextGroup(fwd.nextId()) != null ||
fwd.op() == Objective.Operation.REMOVE) {
@@ -269,11 +280,11 @@
// after a notification arrives
if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
pendingForwards.compute(fwd.nextId(), (id, pending) -> {
- PendingNext next = new PendingNext(deviceId, fwd);
+ PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd);
if (pending == null) {
- return Sets.newHashSet(next);
+ return Sets.newHashSet(pendfo);
} else {
- pending.add(next);
+ pending.add(pendfo);
return pending;
}
});
@@ -287,6 +298,38 @@
return queued;
}
+ private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
+ if (flowObjectiveStore.getNextGroup(next.id()) != null ||
+ next.op() == Operation.ADD) {
+ // either group exists or we are trying to create it - let it through
+ return false;
+ }
+ // we need to hold off on other operations till we get notified that the
+ // initial group creation has succeeded
+ boolean queued = false;
+ synchronized (pendingNexts) {
+ // double check the flow objective store, because this block could run
+ // after a notification arrives
+ if (flowObjectiveStore.getNextGroup(next.id()) == null) {
+ pendingNexts.compute(next.id(), (id, pending) -> {
+ PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next);
+ if (pending == null) {
+ return Sets.newHashSet(pendfo);
+ } else {
+ pending.add(pendfo);
+ return pending;
+ }
+ });
+ queued = true;
+ }
+ }
+ if (queued) {
+ log.debug("Queued next objective {} with operation {} meant for device {}",
+ next.id(), next.op(), deviceId);
+ }
+ return queued;
+ }
+
/**
* Retrieves (if it exists) the device pipeline behaviour from the cache.
* Otherwise it warms the caches and triggers the init method of the Pipeline.
@@ -449,49 +492,70 @@
public void notify(ObjectiveEvent event) {
if (event.type() == Type.ADD) {
log.debug("Received notification of obj event {}", event);
- Set<PendingNext> pending;
+ Set<PendingFlowObjective> pending;
+
+ // first send all pending flows
synchronized (pendingForwards) {
// needs to be synchronized for queueObjective lookup
pending = pendingForwards.remove(event.subject());
}
-
if (pending == null) {
- log.debug("Nothing pending for this obj event {}", event);
- return;
+ log.debug("No forwarding objectives pending for this "
+ + "obj event {}", event);
+ } else {
+ log.debug("Processing {} pending forwarding objectives for nextId {}",
+ pending.size(), event.subject());
+ pending.forEach(p -> getDevicePipeliner(p.deviceId())
+ .forward((ForwardingObjective) p.flowObjective()));
}
- log.debug("Processing {} pending forwarding objectives for nextId {}",
- pending.size(), event.subject());
- pending.forEach(p -> getDevicePipeliner(p.deviceId())
- .forward(p.forwardingObjective()));
+ // now check for pending next-objectives
+ synchronized (pendingNexts) {
+ // needs to be synchronized for queueObjective lookup
+ pending = pendingNexts.remove(event.subject());
+ }
+ if (pending == null) {
+ log.debug("No next objectives pending for this "
+ + "obj event {}", event);
+ } else {
+ log.debug("Processing {} pending next objectives for nextId {}",
+ pending.size(), event.subject());
+ pending.forEach(p -> getDevicePipeliner(p.deviceId())
+ .next((NextObjective) p.flowObjective()));
+ }
}
}
}
/**
- * Data class used to hold a pending forwarding objective that could not
+ * Data class used to hold a pending flow objective that could not
* be processed because the associated next object was not present.
+ * Note that this pending flow objective could be a forwarding objective
+ * waiting for a next objective to complete execution. Or it could a
+ * next objective (with a different operation - remove, addToExisting, or
+ * removeFromExisting) waiting for a next objective with the same id to
+ * complete execution.
*/
- private class PendingNext {
+ private class PendingFlowObjective {
private final DeviceId deviceId;
- private final ForwardingObjective fwd;
+ private final Objective flowObj;
- public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+ public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
this.deviceId = deviceId;
- this.fwd = fwd;
+ this.flowObj = flowObj;
}
public DeviceId deviceId() {
return deviceId;
}
- public ForwardingObjective forwardingObjective() {
- return fwd;
+ public Objective flowObjective() {
+ return flowObj;
}
@Override
public int hashCode() {
- return Objects.hash(deviceId, fwd);
+ return Objects.hash(deviceId, flowObj);
}
@Override
@@ -499,12 +563,12 @@
if (this == obj) {
return true;
}
- if (!(obj instanceof PendingNext)) {
+ if (!(obj instanceof PendingFlowObjective)) {
return false;
}
- final PendingNext other = (PendingNext) obj;
+ final PendingFlowObjective other = (PendingFlowObjective) obj;
if (this.deviceId.equals(other.deviceId) &&
- this.fwd.equals(other.fwd)) {
+ this.flowObj.equals(other.flowObj)) {
return true;
}
return false;
@@ -541,24 +605,48 @@
}
@Override
- public List<String> getPendingNexts() {
- List<String> pendingNexts = new ArrayList<>();
- for (Integer nextId : pendingForwards.keySet()) {
- Set<PendingNext> pnext = pendingForwards.get(nextId);
+ public List<String> getPendingFlowObjectives() {
+ List<String> pendingFlowObjectives = new ArrayList<>();
+ for (Integer nextId : pendingForwards.keySet()) {
+ Set<PendingFlowObjective> pfwd = pendingForwards.get(nextId);
StringBuilder pend = new StringBuilder();
pend.append("NextId: ")
.append(nextId);
- for (PendingNext pn : pnext) {
+ for (PendingFlowObjective pf : pfwd) {
pend.append("\n FwdId: ")
- .append(String.format("%11s", pn.forwardingObjective().id()))
+ .append(String.format("%11s", pf.flowObjective().id()))
+ .append(", DeviceId: ")
+ .append(pf.deviceId())
+ .append(", Selector: ")
+ .append(((ForwardingObjective) pf.flowObjective())
+ .selector().criteria());
+ }
+ pendingFlowObjectives.add(pend.toString());
+ }
+
+ for (Integer nextId : pendingNexts.keySet()) {
+ Set<PendingFlowObjective> pnext = pendingNexts.get(nextId);
+ StringBuilder pend = new StringBuilder();
+ pend.append("NextId: ")
+ .append(nextId);
+ for (PendingFlowObjective pn : pnext) {
+ pend.append("\n NextOp: ")
+ .append(pn.flowObjective().op())
.append(", DeviceId: ")
.append(pn.deviceId())
- .append(", Selector: ")
- .append(pn.forwardingObjective().selector().criteria());
+ .append(", Treatments: ")
+ .append(((NextObjective) pn.flowObjective())
+ .next());
}
- pendingNexts.add(pend.toString());
+ pendingFlowObjectives.add(pend.toString());
}
- return pendingNexts;
+
+ return pendingFlowObjectives;
+ }
+
+ @Override
+ public List<String> getPendingNexts() {
+ return getPendingFlowObjectives();
}
}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java
index 3e0d2eb..60da0e6 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/composition/FlowObjectiveCompositionManager.java
@@ -432,8 +432,13 @@
}
@Override
- public List<String> getPendingNexts() {
+ public List<String> getPendingFlowObjectives() {
// TODO Implementation deferred as this is an experimental component.
return ImmutableList.of();
}
+
+ @Override
+ public List<String> getPendingNexts() {
+ return ImmutableList.of();
+ }
}