CORD-1180 Collection of fixes for hash-group buckets. Required the following changes:
	  Next-objectives that edited groups are now queued in the FlowObjectiveManager instead of the driver.
	  During linkup immediately checking for previous portups that should be added to a hash group.
	  A final retry 30 secs later to catch all ports that should be part of the same hash group.

Change-Id: I7ef450149d685890ca47932b8e559a0c11dc5ab4
diff --git a/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java b/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
index fce4e50..fec8b39 100644
--- a/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
+++ b/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
@@ -55,9 +55,13 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -97,6 +101,10 @@
             portNextObjStore = null;
     private SegmentRoutingManager srManager;
 
+    private static final long RETRY_INTERVAL_SEC = 30;
+    private ScheduledExecutorService executorService
+    = newScheduledThreadPool(1, groupedThreads("retryhashbkts", "retry-%d", log));
+
     protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
             .register(URI.class).register(HashSet.class)
             .register(DeviceId.class).register(PortNumber.class)
@@ -183,7 +191,7 @@
      * discovered on this device.
      *
      * @param newLink new neighbor link
-     * @param isMaster true if local instance is the master
+     * @param isMaster true if local instance is the master for src-device of link
      *
      */
     public void linkUp(Link newLink, boolean isMaster) {
@@ -231,55 +239,79 @@
                 .filter((ns) -> (ns.getDeviceIds()
                         .contains(newLink.dst().deviceId())))
                 .collect(Collectors.toSet());
-        log.trace("linkUp: nsNextObjStore contents for device {}:",
-                deviceId,
-                nsSet);
+        log.debug("linkUp: nsNextObjStore contents for device {}: {}",
+                  deviceId, nsSet);
         for (NeighborSet ns : nsSet) {
             Integer nextId = nsNextObjStore.
                     get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
             if (nextId != null && isMaster) {
-                // Create the new bucket to be updated
-                TrafficTreatment.Builder tBuilder =
-                        DefaultTrafficTreatment.builder();
-                tBuilder.setOutput(newLink.src().port())
-                    .setEthDst(dstMac)
-                    .setEthSrc(nodeMacAddr);
-                if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
-                    tBuilder.pushMpls()
-                        .copyTtlOut()
-                        .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+                addToHashedNextObjective(newLink.src().port(), dstMac, ns,
+                                         nextId, false);
+                // some links may have come up before the next-objective was created
+                // we take this opportunity to ensure other ports to same next-hop-dst
+                // are part of the hash group (see CORD-1180). Duplicate additions
+                // to the same hash group are avoided by the driver.
+                for (PortNumber p : devicePortMap.get(newLink.dst().deviceId())) {
+                    if (p.equals(newLink.src().port())) {
+                        continue;
+                    }
+                    addToHashedNextObjective(p, dstMac, ns, nextId, false);
                 }
-                // setup metadata to pass to nextObjective - indicate the vlan on egress
-                // if needed by the switch pipeline. Since hashed next-hops are always to
-                // other neighboring routers, there is no subnet assigned on those ports.
-                TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
-                metabuilder.matchVlanId(INTERNAL_VLAN);
-
-                NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
-                        .withId(nextId)
-                        .withType(NextObjective.Type.HASHED)
-                        .addTreatment(tBuilder.build())
-                        .withMeta(metabuilder.build())
-                        .fromApp(appId);
-                log.info("**linkUp in device {}: Adding Bucket "
-                        + "with Port {} to next object id {}",
-                        deviceId,
-                        newLink.src().port(),
-                        nextId);
-
-                ObjectiveContext context = new DefaultObjectiveContext(
-                        (objective) -> log.debug("LinkUp addedTo NextObj {} on {}",
-                                nextId, deviceId),
-                        (objective, error) ->
-                                log.warn("LinkUp failed to addTo NextObj {} on {}: {}",
-                                        nextId, deviceId, error));
-                NextObjective nextObjective = nextObjBuilder.addToExisting(context);
-                flowObjectiveService.next(deviceId, nextObjective);
             } else if (isMaster) {
                 log.warn("linkUp in device {}, but global store has no record "
                         + "for neighbor-set {}", deviceId, ns);
             }
         }
+
+        // It's possible that at the time of linkup, some hash-groups have
+        // not been created yet by the instance responsible for creating them, or
+        // due to the eventually-consistent nature of the nsNextObjStore it has
+        // not synced up with this instance yet. Thus we perform this check again
+        // after a delay (see CORD-1180). Duplicate additions to the same hash group
+        // are avoided by the driver.
+        if (isMaster) {
+            executorService.schedule(new RetryHashBkts(newLink, dstMac),
+                                     RETRY_INTERVAL_SEC, TimeUnit.SECONDS);
+        }
+    }
+
+    private void addToHashedNextObjective(PortNumber outport, MacAddress dstMac,
+            NeighborSet ns, Integer nextId, boolean retry) {
+        // Create the new bucket to be updated
+        TrafficTreatment.Builder tBuilder =
+                DefaultTrafficTreatment.builder();
+        tBuilder.setOutput(outport)
+            .setEthDst(dstMac)
+            .setEthSrc(nodeMacAddr);
+        if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+            tBuilder.pushMpls()
+                .copyTtlOut()
+                .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+        }
+        // setup metadata to pass to nextObjective - indicate the vlan on egress
+        // if needed by the switch pipeline. Since hashed next-hops are always to
+        // other neighboring routers, there is no subnet assigned on those ports.
+        TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
+        metabuilder.matchVlanId(INTERNAL_VLAN);
+
+        NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
+                .withId(nextId)
+                .withType(NextObjective.Type.HASHED)
+                .addTreatment(tBuilder.build())
+                .withMeta(metabuilder.build())
+                .fromApp(appId);
+        log.info("{} in device {}: Adding Bucket with Port {} to next object id {}",
+                 (retry) ? "**retry" : "**linkup",
+                         deviceId, outport, nextId);
+
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("LinkUp addedTo NextObj {} on {}",
+                        nextId, deviceId),
+                (objective, error) ->
+                        log.warn("LinkUp failed to addTo NextObj {} on {}: {}",
+                                nextId, deviceId, error));
+        NextObjective nextObjective = nextObjBuilder.addToExisting(context);
+        flowObjectiveService.next(deviceId, nextObjective);
     }
 
     /**
@@ -735,7 +767,7 @@
                                     + " NextObj {} on {}: {}", nextId, deviceId, error)
                     );
             NextObjective nextObj = nextObjBuilder.add(context);
-            log.debug("**createGroupsFromNeighborsets: Submited "
+            log.debug("**createGroupsFromNeighborsets: Submitted "
                     + "next objective {} in device {}",
                     nextId, deviceId);
             flowObjectiveService.next(deviceId, nextObj);
@@ -907,4 +939,43 @@
         }
         // should probably clean local stores port-neighbor
     }
+
+    /**
+     * RetryHashBkts is a one-time retry at populating all the buckets of a
+     * hash group based on the given link. Should only be called by the
+     * master instance of the src-device of the link.
+     */
+    protected final class RetryHashBkts implements Runnable {
+        Link link;
+        MacAddress dstMac;
+
+        private RetryHashBkts(Link link, MacAddress dstMac) {
+            this.link = link;
+            this.dstMac = dstMac;
+        }
+
+        @Override
+        public void run() {
+            log.info("RETRY Hash buckets for linkup: {}", link);
+            Set<NeighborSet> nsSet = nsNextObjStore.keySet()
+                    .stream()
+                    .filter(nsStoreEntry -> nsStoreEntry.deviceId().equals(deviceId))
+                    .map(nsStoreEntry -> nsStoreEntry.neighborSet())
+                    .filter(ns -> ns.getDeviceIds()
+                            .contains(link.dst().deviceId()))
+                    .collect(Collectors.toSet());
+            log.debug("retry-link: nsNextObjStore contents for device {}: {}",
+                      deviceId, nsSet);
+            for (NeighborSet ns : nsSet) {
+                Integer nextId = nsNextObjStore.
+                        get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
+                if (nextId != null) {
+                    addToHashedNextObjective(link.src().port(), dstMac, ns,
+                                             nextId, true);
+                }
+            }
+        }
+    }
+
+
 }