fix: (vNet) reflect changes of flow objective service

To reflect changes for ONOS-6476.
Paired with core flow objective service.

Change-Id: I67c323fe6863176ac2b8ca73774d1ee7261b69c0
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
index b28a34e..36b286d 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
@@ -102,7 +102,15 @@
     private final PipelinerContext context = new InnerPipelineContext();
 
     private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
-    private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
+
+    // local stores for queuing fwd and next objectives that are waiting for an
+    // associated next objective execution to complete. The signal for completed
+    // execution comes from a pipeline driver, in this or another controller
+    // instance, via the DistributedFlowObjectiveStore.
+    private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
+            Maps.newConcurrentMap();
+    private final Map<Integer, Set<PendingFlowObjective>> pendingNexts =
+            Maps.newConcurrentMap();
 
     // local store to track which nextObjectives were sent to which device
     // for debugging purposes
@@ -133,16 +141,24 @@
 
     @Override
     public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
-        if (queueObjective(deviceId, forwardingObjective)) {
-            return;
+        if (forwardingObjective.nextId() == null ||
+                forwardingObjective.op() == Objective.Operation.REMOVE ||
+                flowObjectiveStore.getNextGroup(forwardingObjective.nextId()) != null ||
+                !queueFwdObjective(deviceId, forwardingObjective)) {
+            // fast path
+            executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
         }
-        executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
     }
 
     @Override
     public void next(DeviceId deviceId, NextObjective nextObjective) {
         nextToDevice.put(nextObjective.id(), deviceId);
-        executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+        if (nextObjective.op() == Objective.Operation.ADD ||
+                flowObjectiveStore.getNextGroup(nextObjective.id()) != null ||
+                !queueNextObjective(deviceId, nextObjective)) {
+            // either group exists or we are trying to create it - let it through
+            executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+        }
     }
 
     @Override
@@ -186,19 +202,43 @@
 
     @Override
     public List<String> getPendingFlowObjectives() {
-        List<String> pendingNexts = new ArrayList<>();
+        List<String> pendingFlowObjectives = new ArrayList<>();
+
         for (Integer nextId : pendingForwards.keySet()) {
-            Set<PendingNext> pnext = pendingForwards.get(nextId);
+            Set<PendingFlowObjective> pfwd = pendingForwards.get(nextId);
             StringBuilder pend = new StringBuilder();
-            pend.append("Next Id: ").append(Integer.toString(nextId))
-                    .append(" :: ");
-            for (PendingNext pn : pnext) {
-                pend.append(Integer.toString(pn.forwardingObjective().id()))
-                        .append(" ");
+            pend.append("NextId: ")
+                    .append(nextId);
+            for (PendingFlowObjective pf : pfwd) {
+                pend.append("\n    FwdId: ")
+                        .append(String.format("%11s", pf.flowObjective().id()))
+                        .append(", DeviceId: ")
+                        .append(pf.deviceId())
+                        .append(", Selector: ")
+                        .append(((ForwardingObjective) pf.flowObjective())
+                                        .selector().criteria());
             }
-            pendingNexts.add(pend.toString());
+            pendingFlowObjectives.add(pend.toString());
         }
-        return pendingNexts;
+
+        for (Integer nextId : pendingNexts.keySet()) {
+            Set<PendingFlowObjective> pnext = pendingNexts.get(nextId);
+            StringBuilder pend = new StringBuilder();
+            pend.append("NextId: ")
+                    .append(nextId);
+            for (PendingFlowObjective pn : pnext) {
+                pend.append("\n    NextOp: ")
+                        .append(pn.flowObjective().op())
+                        .append(", DeviceId: ")
+                        .append(pn.deviceId())
+                        .append(", Treatments: ")
+                        .append(((NextObjective) pn.flowObjective())
+                                        .next());
+            }
+            pendingFlowObjectives.add(pend.toString());
+        }
+
+        return pendingFlowObjectives;
     }
 
     @Override
@@ -206,23 +246,18 @@
         return getPendingFlowObjectives();
     }
 
-    private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
-        if (fwd.nextId() == null ||
-                flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
-            // fast path
-            return false;
-        }
+    private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
         boolean queued = false;
         synchronized (pendingForwards) {
             // double check the flow objective store, because this block could run
             // after a notification arrives
             if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
                 pendingForwards.compute(fwd.nextId(), (id, pending) -> {
-                    PendingNext next = new PendingNext(deviceId, fwd);
+                    PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd);
                     if (pending == null) {
-                        return Sets.newHashSet(next);
+                        return Sets.newHashSet(pendfo);
                     } else {
-                        pending.add(next);
+                        pending.add(pendfo);
                         return pending;
                     }
                 });
@@ -236,6 +271,34 @@
         return queued;
     }
 
+    private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
+
+        // we need to hold off on other operations till we get notified that the
+        // initial group creation has succeeded
+        boolean queued = false;
+        synchronized (pendingNexts) {
+            // double check the flow objective store, because this block could run
+            // after a notification arrives
+            if (flowObjectiveStore.getNextGroup(next.id()) == null) {
+                pendingNexts.compute(next.id(), (id, pending) -> {
+                    PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next);
+                    if (pending == null) {
+                        return Sets.newHashSet(pendfo);
+                    } else {
+                        pending.add(pendfo);
+                        return pending;
+                    }
+                });
+                queued = true;
+            }
+        }
+        if (queued) {
+            log.debug("Queued next objective {} with operation {} meant for device {}",
+                      next.id(), next.op(), deviceId);
+        }
+        return queued;
+    }
+
     /**
      * Task that passes the flow objective down to the driver. The task will
      * make a few attempts to find the appropriate driver, then eventually give
@@ -264,6 +327,7 @@
 
                 if (pipeliner != null) {
                     if (objective instanceof NextObjective) {
+                        nextToDevice.put(objective.id(), deviceId);
                         pipeliner.next((NextObjective) objective);
                     } else if (objective instanceof ForwardingObjective) {
                         pipeliner.forward((ForwardingObjective) objective);
@@ -280,7 +344,7 @@
                     objective.context().ifPresent(
                             c -> c.onError(objective, ObjectiveError.NOPIPELINER));
                 }
-                //Excpetion thrown
+                //Exception thrown
             } catch (Exception e) {
                 log.warn("Exception while installing flow objective", e);
             }
@@ -292,21 +356,37 @@
         public void notify(ObjectiveEvent event) {
             if (event.type() == ObjectiveEvent.Type.ADD) {
                 log.debug("Received notification of obj event {}", event);
-                Set<PendingNext> pending;
+                Set<PendingFlowObjective> pending;
+
+                // first send all pending flows
                 synchronized (pendingForwards) {
                     // needs to be synchronized for queueObjective lookup
                     pending = pendingForwards.remove(event.subject());
                 }
-
                 if (pending == null) {
-                    log.debug("Nothing pending for this obj event {}", event);
-                    return;
+                    log.debug("No forwarding objectives pending for this "
+                                      + "obj event {}", event);
+                } else {
+                    log.debug("Processing {} pending forwarding objectives for nextId {}",
+                              pending.size(), event.subject());
+                    pending.forEach(p -> getDevicePipeliner(p.deviceId())
+                            .forward((ForwardingObjective) p.flowObjective()));
                 }
 
-                log.debug("Processing {} pending forwarding objectives for nextId {}",
-                          pending.size(), event.subject());
-                pending.forEach(p -> getDevicePipeliner(p.deviceId())
-                        .forward(p.forwardingObjective()));
+                // now check for pending next-objectives
+                synchronized (pendingNexts) {
+                    // needs to be synchronized for queueObjective lookup
+                    pending = pendingNexts.remove(event.subject());
+                }
+                if (pending == null) {
+                    log.debug("No next objectives pending for this "
+                                      + "obj event {}", event);
+                } else {
+                    log.debug("Processing {} pending next objectives for nextId {}",
+                              pending.size(), event.subject());
+                    pending.forEach(p -> getDevicePipeliner(p.deviceId())
+                            .next((NextObjective) p.flowObjective()));
+                }
             }
         }
     }
@@ -353,29 +433,34 @@
     }
 
     /**
-     * Data class used to hold a pending forwarding objective that could not
+     * Data class used to hold a pending flow objective that could not
      * be processed because the associated next object was not present.
+     * Note that this pending flow objective could be a forwarding objective
+     * waiting for a next objective to complete execution. Or it could a
+     * next objective (with a different operation - remove, addToExisting, or
+     * removeFromExisting) waiting for a next objective with the same id to
+     * complete execution.
      */
-    private class PendingNext {
+    private class PendingFlowObjective {
         private final DeviceId deviceId;
-        private final ForwardingObjective fwd;
+        private final Objective flowObj;
 
-        public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+        public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
             this.deviceId = deviceId;
-            this.fwd = fwd;
+            this.flowObj = flowObj;
         }
 
         public DeviceId deviceId() {
             return deviceId;
         }
 
-        public ForwardingObjective forwardingObjective() {
-            return fwd;
+        public Objective flowObjective() {
+            return flowObj;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(deviceId, fwd);
+            return Objects.hash(deviceId, flowObj);
         }
 
         @Override
@@ -383,12 +468,12 @@
             if (this == obj) {
                 return true;
             }
-            if (!(obj instanceof PendingNext)) {
+            if (!(obj instanceof PendingFlowObjective)) {
                 return false;
             }
-            final PendingNext other = (PendingNext) obj;
+            final PendingFlowObjective other = (PendingFlowObjective) obj;
             if (this.deviceId.equals(other.deviceId) &&
-                    this.fwd.equals(other.fwd)) {
+                    this.flowObj.equals(other.flowObj)) {
                 return true;
             }
             return false;