fix: (vNet) reflect changes of flow objective service
To reflect changes for ONOS-6476.
Paired with core flow objective service.
Change-Id: I67c323fe6863176ac2b8ca73774d1ee7261b69c0
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
index b28a34e..36b286d 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
@@ -102,7 +102,15 @@
private final PipelinerContext context = new InnerPipelineContext();
private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
- private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
+
+ // local stores for queuing fwd and next objectives that are waiting for an
+ // associated next objective execution to complete. The signal for completed
+ // execution comes from a pipeline driver, in this or another controller
+ // instance, via the DistributedFlowObjectiveStore.
+ private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
+ Maps.newConcurrentMap();
+ private final Map<Integer, Set<PendingFlowObjective>> pendingNexts =
+ Maps.newConcurrentMap();
// local store to track which nextObjectives were sent to which device
// for debugging purposes
@@ -133,16 +141,24 @@
@Override
public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
- if (queueObjective(deviceId, forwardingObjective)) {
- return;
+ if (forwardingObjective.nextId() == null ||
+ forwardingObjective.op() == Objective.Operation.REMOVE ||
+ flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
+ !queueFwdObjective(deviceId, forwardingObjective)) {
+ // fast path
+ executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
}
- executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
}
@Override
public void next(DeviceId deviceId, NextObjective nextObjective) {
nextToDevice.put(nextObjective.id(), deviceId);
- executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+ if (nextObjective.op() == Objective.Operation.ADD ||
+ flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
+ !queueNextObjective(deviceId, nextObjective)) {
+ // either group exists or we are trying to create it - let it through
+ executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+ }
}
@Override
@@ -186,19 +202,43 @@
@Override
public List<String> getPendingFlowObjectives() {
- List<String> pendingNexts = new ArrayList<>();
+ List<String> pendingFlowObjectives = new ArrayList<>();
+
for (Integer nextId : pendingForwards.keySet()) {
- Set<PendingNext> pnext = pendingForwards.get(nextId);
+ Set<PendingFlowObjective> pfwd = pendingForwards.get(nextId);
StringBuilder pend = new StringBuilder();
- pend.append("Next Id: ").append(Integer.toString(nextId))
- .append(" :: ");
- for (PendingNext pn : pnext) {
- pend.append(Integer.toString(pn.forwardingObjective().id()))
- .append(" ");
+ pend.append("NextId: ")
+ .append(nextId);
+ for (PendingFlowObjective pf : pfwd) {
+ pend.append("\n FwdId: ")
+ .append(String.format("%11s", pf.flowObjective().id()))
+ .append(", DeviceId: ")
+ .append(pf.deviceId())
+ .append(", Selector: ")
+ .append(((ForwardingObjective) pf.flowObjective())
+ .selector().criteria());
}
- pendingNexts.add(pend.toString());
+ pendingFlowObjectives.add(pend.toString());
}
- return pendingNexts;
+
+ for (Integer nextId : pendingNexts.keySet()) {
+ Set<PendingFlowObjective> pnext = pendingNexts.get(nextId);
+ StringBuilder pend = new StringBuilder();
+ pend.append("NextId: ")
+ .append(nextId);
+ for (PendingFlowObjective pn : pnext) {
+ pend.append("\n NextOp: ")
+ .append(pn.flowObjective().op())
+ .append(", DeviceId: ")
+ .append(pn.deviceId())
+ .append(", Treatments: ")
+ .append(((NextObjective) pn.flowObjective())
+ .next());
+ }
+ pendingFlowObjectives.add(pend.toString());
+ }
+
+ return pendingFlowObjectives;
}
@Override
@@ -206,23 +246,18 @@
return getPendingFlowObjectives();
}
- private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
- if (fwd.nextId() == null ||
- flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
- // fast path
- return false;
- }
+ private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
boolean queued = false;
synchronized (pendingForwards) {
// double check the flow objective store, because this block could run
// after a notification arrives
if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
pendingForwards.compute(fwd.nextId(), (id, pending) -> {
- PendingNext next = new PendingNext(deviceId, fwd);
+ PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd);
if (pending == null) {
- return Sets.newHashSet(next);
+ return Sets.newHashSet(pendfo);
} else {
- pending.add(next);
+ pending.add(pendfo);
return pending;
}
});
@@ -236,6 +271,34 @@
return queued;
}
+ private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
+
+ // we need to hold off on other operations till we get notified that the
+ // initial group creation has succeeded
+ boolean queued = false;
+ synchronized (pendingNexts) {
+ // double check the flow objective store, because this block could run
+ // after a notification arrives
+ if (flowObjectiveStore.getNextGroup(next.id()) == null) {
+ pendingNexts.compute(next.id(), (id, pending) -> {
+ PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next);
+ if (pending == null) {
+ return Sets.newHashSet(pendfo);
+ } else {
+ pending.add(pendfo);
+ return pending;
+ }
+ });
+ queued = true;
+ }
+ }
+ if (queued) {
+ log.debug("Queued next objective {} with operation {} meant for device {}",
+ next.id(), next.op(), deviceId);
+ }
+ return queued;
+ }
+
/**
* Task that passes the flow objective down to the driver. The task will
* make a few attempts to find the appropriate driver, then eventually give
@@ -264,6 +327,7 @@
if (pipeliner != null) {
if (objective instanceof NextObjective) {
+ nextToDevice.put(objective.id(), deviceId);
pipeliner.next((NextObjective) objective);
} else if (objective instanceof ForwardingObjective) {
pipeliner.forward((ForwardingObjective) objective);
@@ -280,7 +344,7 @@
objective.context().ifPresent(
c -> c.onError(objective, ObjectiveError.NOPIPELINER));
}
- //Excpetion thrown
+ //Exception thrown
} catch (Exception e) {
log.warn("Exception while installing flow objective", e);
}
@@ -292,21 +356,37 @@
public void notify(ObjectiveEvent event) {
if (event.type() == ObjectiveEvent.Type.ADD) {
log.debug("Received notification of obj event {}", event);
- Set<PendingNext> pending;
+ Set<PendingFlowObjective> pending;
+
+ // first send all pending flows
synchronized (pendingForwards) {
// needs to be synchronized for queueObjective lookup
pending = pendingForwards.remove(event.subject());
}
-
if (pending == null) {
- log.debug("Nothing pending for this obj event {}", event);
- return;
+ log.debug("No forwarding objectives pending for this "
+ + "obj event {}", event);
+ } else {
+ log.debug("Processing {} pending forwarding objectives for nextId {}",
+ pending.size(), event.subject());
+ pending.forEach(p -> getDevicePipeliner(p.deviceId())
+ .forward((ForwardingObjective) p.flowObjective()));
}
- log.debug("Processing {} pending forwarding objectives for nextId {}",
- pending.size(), event.subject());
- pending.forEach(p -> getDevicePipeliner(p.deviceId())
- .forward(p.forwardingObjective()));
+ // now check for pending next-objectives
+ synchronized (pendingNexts) {
+ // needs to be synchronized for queueObjective lookup
+ pending = pendingNexts.remove(event.subject());
+ }
+ if (pending == null) {
+ log.debug("No next objectives pending for this "
+ + "obj event {}", event);
+ } else {
+ log.debug("Processing {} pending next objectives for nextId {}",
+ pending.size(), event.subject());
+ pending.forEach(p -> getDevicePipeliner(p.deviceId())
+ .next((NextObjective) p.flowObjective()));
+ }
}
}
}
@@ -353,29 +433,34 @@
}
/**
- * Data class used to hold a pending forwarding objective that could not
+ * Data class used to hold a pending flow objective that could not
* be processed because the associated next object was not present.
+ * Note that this pending flow objective could be a forwarding objective
+ * waiting for a next objective to complete execution. Or it could a
+ * next objective (with a different operation - remove, addToExisting, or
+ * removeFromExisting) waiting for a next objective with the same id to
+ * complete execution.
*/
- private class PendingNext {
+ private class PendingFlowObjective {
private final DeviceId deviceId;
- private final ForwardingObjective fwd;
+ private final Objective flowObj;
- public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+ public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
this.deviceId = deviceId;
- this.fwd = fwd;
+ this.flowObj = flowObj;
}
public DeviceId deviceId() {
return deviceId;
}
- public ForwardingObjective forwardingObjective() {
- return fwd;
+ public Objective flowObjective() {
+ return flowObj;
}
@Override
public int hashCode() {
- return Objects.hash(deviceId, fwd);
+ return Objects.hash(deviceId, flowObj);
}
@Override
@@ -383,12 +468,12 @@
if (this == obj) {
return true;
}
- if (!(obj instanceof PendingNext)) {
+ if (!(obj instanceof PendingFlowObjective)) {
return false;
}
- final PendingNext other = (PendingNext) obj;
+ final PendingFlowObjective other = (PendingFlowObjective) obj;
if (this.deviceId.equals(other.deviceId) &&
- this.fwd.equals(other.fwd)) {
+ this.flowObj.equals(other.flowObj)) {
return true;
}
return false;