CORD-1180 Collection of fixes for hash-group buckets. Required the following changes:
	  Next-objectives that edited groups are now queued in the FlowObjectiveManager instead of the driver.
	  During linkup immediately checking for previous portups that should be added to a hash group.
	  A final retry 30 secs later to catch all ports that should be part of the same hash group.

Change-Id: I7ef450149d685890ca47932b8e559a0c11dc5ab4
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 47b63ed..f51ee7c 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -48,6 +48,7 @@
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.Objective.Operation;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
 import org.onosproject.net.flowobjective.ObjectiveEvent.Type;
@@ -126,7 +127,14 @@
 
     protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
 
-    private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
+    // local stores for queuing fwd and next objectives that are waiting for an
+    // associated next objective execution to complete. The signal for completed
+    // execution comes from a pipeline driver, in this or another controller
+    // instance, via the DistributedFlowObjectiveStore.
+    private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
+            Maps.newConcurrentMap();
+    private final Map<Integer, Set<PendingFlowObjective>> pendingNexts =
+            Maps.newConcurrentMap();
 
     // local store to track which nextObjectives were sent to which device
     // for debugging purposes
@@ -202,6 +210,7 @@
 
                 if (pipeliner != null) {
                     if (objective instanceof NextObjective) {
+                        nextToDevice.put(objective.id(), deviceId);
                         pipeliner.next((NextObjective) objective);
                     } else if (objective instanceof ForwardingObjective) {
                         pipeliner.forward((ForwardingObjective) objective);
@@ -218,7 +227,7 @@
                     objective.context().ifPresent(
                             c -> c.onError(objective, ObjectiveError.NOPIPELINER));
                 }
-                //Excpetion thrown
+                //Exception thrown
             } catch (Exception e) {
                 log.warn("Exception while installing flow objective", e);
             }
@@ -234,7 +243,7 @@
     @Override
     public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
         checkPermission(FLOWRULE_WRITE);
-        if (queueObjective(deviceId, forwardingObjective)) {
+        if (queueFwdObjective(deviceId, forwardingObjective)) {
             return;
         }
         executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
@@ -243,7 +252,9 @@
     @Override
     public void next(DeviceId deviceId, NextObjective nextObjective) {
         checkPermission(FLOWRULE_WRITE);
-        nextToDevice.put(nextObjective.id(), deviceId);
+        if (queueNextObjective(deviceId, nextObjective)) {
+            return;
+        }
         executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
     }
 
@@ -256,7 +267,7 @@
     @Override
     public void initPolicy(String policy) {}
 
-    private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
+    private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
         if (fwd.nextId() == null ||
                 flowObjectiveStore.getNextGroup(fwd.nextId()) != null ||
                 fwd.op() == Objective.Operation.REMOVE) {
@@ -269,11 +280,11 @@
             // after a notification arrives
             if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
                 pendingForwards.compute(fwd.nextId(), (id, pending) -> {
-                    PendingNext next = new PendingNext(deviceId, fwd);
+                    PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, fwd);
                     if (pending == null) {
-                        return Sets.newHashSet(next);
+                        return Sets.newHashSet(pendfo);
                     } else {
-                        pending.add(next);
+                        pending.add(pendfo);
                         return pending;
                     }
                 });
@@ -287,6 +298,38 @@
         return queued;
     }
 
+    private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
+        if (flowObjectiveStore.getNextGroup(next.id()) != null ||
+                next.op() == Operation.ADD) {
+            // either group exists or we are trying to create it - let it through
+            return false;
+        }
+        // we need to hold off on other operations till we get notified that the
+        // initial group creation has succeeded
+        boolean queued = false;
+        synchronized (pendingNexts) {
+            // double check the flow objective store, because this block could run
+            // after a notification arrives
+            if (flowObjectiveStore.getNextGroup(next.id()) == null) {
+                pendingNexts.compute(next.id(), (id, pending) -> {
+                    PendingFlowObjective pendfo = new PendingFlowObjective(deviceId, next);
+                    if (pending == null) {
+                        return Sets.newHashSet(pendfo);
+                    } else {
+                        pending.add(pendfo);
+                        return pending;
+                    }
+                });
+                queued = true;
+            }
+        }
+        if (queued) {
+            log.debug("Queued next objective {} with operation {} meant for device {}",
+                      next.id(), next.op(), deviceId);
+        }
+        return queued;
+    }
+
     /**
      * Retrieves (if it exists) the device pipeline behaviour from the cache.
      * Otherwise it warms the caches and triggers the init method of the Pipeline.
@@ -449,49 +492,70 @@
         public void notify(ObjectiveEvent event) {
             if (event.type() == Type.ADD) {
                 log.debug("Received notification of obj event {}", event);
-                Set<PendingNext> pending;
+                Set<PendingFlowObjective> pending;
+
+                // first send all pending flows
                 synchronized (pendingForwards) {
                     // needs to be synchronized for queueObjective lookup
                     pending = pendingForwards.remove(event.subject());
                 }
-
                 if (pending == null) {
-                    log.debug("Nothing pending for this obj event {}", event);
-                    return;
+                    log.debug("No forwarding objectives pending for this "
+                            + "obj event {}", event);
+                } else {
+                    log.debug("Processing {} pending forwarding objectives for nextId {}",
+                              pending.size(), event.subject());
+                    pending.forEach(p -> getDevicePipeliner(p.deviceId())
+                                    .forward((ForwardingObjective) p.flowObjective()));
                 }
 
-                log.debug("Processing {} pending forwarding objectives for nextId {}",
-                         pending.size(), event.subject());
-                pending.forEach(p -> getDevicePipeliner(p.deviceId())
-                                .forward(p.forwardingObjective()));
+                // now check for pending next-objectives
+                synchronized (pendingNexts) {
+                    // needs to be synchronized for queueObjective lookup
+                    pending = pendingNexts.remove(event.subject());
+                }
+                if (pending == null) {
+                    log.debug("No next objectives pending for this "
+                            + "obj event {}", event);
+                } else {
+                    log.debug("Processing {} pending next objectives for nextId {}",
+                              pending.size(), event.subject());
+                    pending.forEach(p -> getDevicePipeliner(p.deviceId())
+                                    .next((NextObjective) p.flowObjective()));
+                }
             }
         }
     }
 
     /**
-     * Data class used to hold a pending forwarding objective that could not
+     * Data class used to hold a pending flow objective that could not
      * be processed because the associated next object was not present.
+     * Note that this pending flow objective could be a forwarding objective
+     * waiting for a next objective to complete execution. Or it could a
+     * next objective (with a different operation - remove, addToExisting, or
+     * removeFromExisting) waiting for a next objective with the same id to
+     * complete execution.
      */
-    private class PendingNext {
+    private class PendingFlowObjective {
         private final DeviceId deviceId;
-        private final ForwardingObjective fwd;
+        private final Objective flowObj;
 
-        public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+        public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
             this.deviceId = deviceId;
-            this.fwd = fwd;
+            this.flowObj = flowObj;
         }
 
         public DeviceId deviceId() {
             return deviceId;
         }
 
-        public ForwardingObjective forwardingObjective() {
-            return fwd;
+        public Objective flowObjective() {
+            return flowObj;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(deviceId, fwd);
+            return Objects.hash(deviceId, flowObj);
         }
 
         @Override
@@ -499,12 +563,12 @@
             if (this == obj) {
                 return true;
             }
-            if (!(obj instanceof PendingNext)) {
+            if (!(obj instanceof PendingFlowObjective)) {
                 return false;
             }
-            final PendingNext other = (PendingNext) obj;
+            final PendingFlowObjective other = (PendingFlowObjective) obj;
             if (this.deviceId.equals(other.deviceId) &&
-                    this.fwd.equals(other.fwd)) {
+                    this.flowObj.equals(other.flowObj)) {
                 return true;
             }
             return false;
@@ -541,24 +605,48 @@
     }
 
     @Override
-    public List<String> getPendingNexts() {
-        List<String> pendingNexts = new ArrayList<>();
-        for (Integer nextId : pendingForwards.keySet()) {
-            Set<PendingNext> pnext = pendingForwards.get(nextId);
+    public List<String> getPendingFlowObjectives() {
+        List<String> pendingFlowObjectives = new ArrayList<>();
 
+        for (Integer nextId : pendingForwards.keySet()) {
+            Set<PendingFlowObjective> pfwd = pendingForwards.get(nextId);
             StringBuilder pend = new StringBuilder();
             pend.append("NextId: ")
                     .append(nextId);
-            for (PendingNext pn : pnext) {
+            for (PendingFlowObjective pf : pfwd) {
                 pend.append("\n    FwdId: ")
-                        .append(String.format("%11s", pn.forwardingObjective().id()))
+                        .append(String.format("%11s", pf.flowObjective().id()))
+                        .append(", DeviceId: ")
+                        .append(pf.deviceId())
+                        .append(", Selector: ")
+                        .append(((ForwardingObjective) pf.flowObjective())
+                                    .selector().criteria());
+            }
+            pendingFlowObjectives.add(pend.toString());
+        }
+
+        for (Integer nextId : pendingNexts.keySet()) {
+            Set<PendingFlowObjective> pnext = pendingNexts.get(nextId);
+            StringBuilder pend = new StringBuilder();
+            pend.append("NextId: ")
+                    .append(nextId);
+            for (PendingFlowObjective pn : pnext) {
+                pend.append("\n    NextOp: ")
+                        .append(pn.flowObjective().op())
                         .append(", DeviceId: ")
                         .append(pn.deviceId())
-                        .append(", Selector: ")
-                        .append(pn.forwardingObjective().selector().criteria());
+                        .append(", Treatments: ")
+                        .append(((NextObjective) pn.flowObjective())
+                                    .next());
             }
-            pendingNexts.add(pend.toString());
+            pendingFlowObjectives.add(pend.toString());
         }
-        return pendingNexts;
+
+        return pendingFlowObjectives;
+    }
+
+    @Override
+    public List<String> getPendingNexts() {
+        return getPendingFlowObjectives();
     }
 }