CORD-1583 Bug fixes for dual ToRs

Two things:
  - In dual (paired) ToR scenarios it is possible to have the same outport
    in multiple buckets in a hash group, as long as they have different labels.
    When adding buckets this was taken into account. But when removing buckets,
    only outport was being checked. This bug fix ensures that labels are checked
    as well when removing buckets.
  - In dual ToR scenarios, getting the right set of hash buckets proved difficult
    with existing 'retryHash' mechanism. Repealed and replaced with a bucket corrector
    mechanism that periodically corrects the hash group buckets when the topology
    has been stable for the last 10 secs. Required the introduction of a VERIFY
    operation in Next Objectives. Also added a cli command to trigger this
    operation manually.

Change-Id: Ib0d2734060fadc6e7a4bd0d75f3409e194413a97
diff --git a/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java b/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
index 85b0c76..6f06d60 100644
--- a/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
+++ b/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
@@ -40,6 +40,7 @@
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.link.LinkService;
+import org.onosproject.segmentrouting.DefaultRoutingHandler;
 import org.onosproject.segmentrouting.SegmentRoutingManager;
 import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
 import org.onosproject.segmentrouting.config.DeviceProperties;
@@ -76,6 +77,8 @@
 public class DefaultGroupHandler {
     protected static final Logger log = getLogger(DefaultGroupHandler.class);
 
+    private static final long VERIFY_INTERVAL = 30; // secs
+
     protected final DeviceId deviceId;
     protected final ApplicationId appId;
     protected final DeviceProperties deviceConfig;
@@ -109,9 +112,8 @@
             portNextObjStore = null;
     private SegmentRoutingManager srManager;
 
-    private static final long RETRY_INTERVAL_SEC = 30;
     private ScheduledExecutorService executorService
-    = newScheduledThreadPool(1, groupedThreads("retryhashbkts", "retry-%d", log));
+    = newScheduledThreadPool(1, groupedThreads("bktCorrector", "bktC-%d", log));
 
     protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
             .register(URI.class).register(HashSet.class)
@@ -145,11 +147,21 @@
         this.vlanNextObjStore = srManager.vlanNextObjStore();
         this.portNextObjStore = srManager.portNextObjStore();
         this.srManager = srManager;
-
+        executorService.scheduleWithFixedDelay(new BucketCorrector(), 10,
+                                               VERIFY_INTERVAL,
+                                               TimeUnit.SECONDS);
         populateNeighborMaps();
     }
 
     /**
+     * Gracefully shuts down a groupHandler. Typically called when the handler is
+     * no longer needed.
+     */
+    public void shutdown() {
+        executorService.shutdown();
+    }
+
+    /**
      * Creates a group handler object.
      *
      * @param deviceId device identifier
@@ -162,13 +174,13 @@
      * @return default group handler type
      */
     public static DefaultGroupHandler createGroupHandler(
-                                          DeviceId deviceId,
-                                          ApplicationId appId,
-                                          DeviceProperties config,
-                                          LinkService linkService,
-                                          FlowObjectiveService flowObjService,
-                                          SegmentRoutingManager srManager)
-                                                  throws DeviceConfigNotFoundException {
+                                                         DeviceId deviceId,
+                                                         ApplicationId appId,
+                                                         DeviceProperties config,
+                                                         LinkService linkService,
+                                                         FlowObjectiveService flowObjService,
+                                                         SegmentRoutingManager srManager)
+                                                                 throws DeviceConfigNotFoundException {
         return new DefaultGroupHandler(deviceId, appId, config,
                                        linkService,
                                        flowObjService,
@@ -181,35 +193,35 @@
      * @param link the infrastructure link
      */
     public void portUpForLink(Link link) {
-       if (!link.src().deviceId().equals(deviceId)) {
-           log.warn("linkUp: deviceId{} doesn't match with link src {}",
-                    deviceId, link.src().deviceId());
-           return;
-       }
+        if (!link.src().deviceId().equals(deviceId)) {
+            log.warn("linkUp: deviceId{} doesn't match with link src {}",
+                     deviceId, link.src().deviceId());
+            return;
+        }
 
-       log.info("* portUpForLink: Device {} linkUp at local port {} to "
-               + "neighbor {}", deviceId, link.src().port(), link.dst().deviceId());
-       // ensure local state is updated even if linkup is aborted later on
-       addNeighborAtPort(link.dst().deviceId(),
-                         link.src().port());
-   }
+        log.info("* portUpForLink: Device {} linkUp at local port {} to "
+                + "neighbor {}", deviceId, link.src().port(), link.dst().deviceId());
+        // ensure local state is updated even if linkup is aborted later on
+        addNeighborAtPort(link.dst().deviceId(),
+                          link.src().port());
+    }
 
-   /**
-    * Updates local stores for port that has gone down.
-    *
-    * @param port port number that has gone down
-    */
-   public void portDown(PortNumber port) {
-       if (portDeviceMap.get(port) == null) {
-           log.warn("portDown: unknown port");
-           return;
-       }
+    /**
+     * Updates local stores for port that has gone down.
+     *
+     * @param port port number that has gone down
+     */
+    public void portDown(PortNumber port) {
+        if (portDeviceMap.get(port) == null) {
+            log.warn("portDown: unknown port");
+            return;
+        }
 
-       log.debug("Device {} portDown {} to neighbor {}", deviceId, port,
-                 portDeviceMap.get(port));
-       devicePortMap.get(portDeviceMap.get(port)).remove(port);
-       portDeviceMap.remove(port);
-   }
+        log.debug("Device {} portDown {} to neighbor {}", deviceId, port,
+                  portDeviceMap.get(port));
+        devicePortMap.get(portDeviceMap.get(port)).remove(port);
+        portDeviceMap.remove(port);
+    }
 
     /**
      * Checks all groups in the src-device of link for neighbor sets that include
@@ -255,7 +267,7 @@
                 dstSet.forEach(dst -> {
                     int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
                     addToHashedNextObjective(link.src().port(), dstMac,
-                                             edgeLabel, nextId, false);
+                                             edgeLabel, nextId);
                 });
 
                 if (firstTime) {
@@ -269,8 +281,7 @@
                         }
                         dstSet.forEach(dst -> {
                             int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
-                            addToHashedNextObjective(p, dstMac, edgeLabel,
-                                                     nextId, false);
+                            addToHashedNextObjective(p, dstMac, edgeLabel, nextId);
                         });
                     }
                 }
@@ -282,17 +293,6 @@
                 });
             }
         }
-
-        // It's possible that at the time of linkup, some hash-groups have
-        // not been created yet by the instance responsible for creating them, or
-        // due to the eventually-consistent nature of the nsNextObjStore it has
-        // not synced up with this instance yet. Thus we perform this check again
-        // after a delay (see CORD-1180). Duplicate additions to the same hash group
-        // are avoided by the driver.
-        if (!linkDown && firstTime) {
-            executorService.schedule(new RetryHashBkts(link, dstMac),
-                                     RETRY_INTERVAL_SEC, TimeUnit.SECONDS);
-        }
     }
 
     /**
@@ -303,11 +303,10 @@
      * @param dstMac destination mac address of next-hop
      * @param edgeLabel the label to use in the bucket
      * @param nextId id for next-objective to which the bucket will be added
-     * @param retry indicates if this method is being called on a retry attempt
-     *              at adding a bucket to the group
+     *
      */
     private void addToHashedNextObjective(PortNumber outport, MacAddress dstMac,
-            int edgeLabel, Integer nextId, boolean retry) {
+            int edgeLabel, Integer nextId) {
         // Create the new bucket to be updated
         TrafficTreatment.Builder tBuilder =
                 DefaultTrafficTreatment.builder();
@@ -331,63 +330,60 @@
                 .addTreatment(tBuilder.build())
                 .withMeta(metabuilder.build())
                 .fromApp(appId);
-        log.debug("{} in device {}: Adding Bucket with port/label {}/{} to nextId {}",
-                 (retry) ? "retry-addToHash" : "addToHash",
-                         deviceId, outport, edgeLabel, nextId);
+        log.debug("addToHash in device {}: Adding Bucket with port/label {}/{} "
+                + "to nextId {}", deviceId, outport, edgeLabel, nextId);
 
         ObjectiveContext context = new DefaultObjectiveContext(
-                (objective) -> log.debug("{} addedTo NextObj {} on {}",
-                                         (retry) ? "retry-addToHash" : "addToHash",
+                (objective) -> log.debug("addToHash addedTo NextObj {} on {}",
                                          nextId, deviceId),
                 (objective, error) ->
-                        log.warn("{} failed to addTo NextObj {} on {}: {}",
-                                 (retry) ? "retry-addToHash" : "addToHash",
+                        log.warn("addToHash failed to addTo NextObj {} on {}: {}",
                                  nextId, deviceId, error));
         NextObjective nextObjective = nextObjBuilder.addToExisting(context);
         flowObjectiveService.next(deviceId, nextObjective);
     }
 
     /**
-    * Makes a call to the FlowObjective service to remove a single bucket from
-    * a hashed group.
-    *
-    * @param port port to remove from hash group
-    * @param dstMac destination mac address of next-hop
-    * @param edgeLabel the label to use in the bucket
-    * @param nextId id for next-objective from which the bucket will be removed
-    */
-   private void removeFromHashedNextObjective(PortNumber port, MacAddress dstMac,
-                                              int edgeLabel, Integer nextId) {
-       // Create the bucket to be removed
-       TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment
-               .builder();
-       tBuilder.setOutput(port)
-           .setEthDst(dstMac)
-           .setEthSrc(nodeMacAddr);
-       if (edgeLabel != DestinationSet.NO_EDGE_LABEL) {
-           tBuilder.pushMpls()
-               .copyTtlOut()
-               .setMpls(MplsLabel.mplsLabel(edgeLabel));
-       }
-       log.info("{} in device {}: Removing Bucket with Port {} to next object id {}",
-                "removeFromHash", deviceId, port, nextId);
-       NextObjective.Builder nextObjBuilder = DefaultNextObjective
-               .builder()
-               .withType(NextObjective.Type.HASHED) //same as original
-               .withId(nextId)
-               .fromApp(appId)
-               .addTreatment(tBuilder.build());
-       ObjectiveContext context = new DefaultObjectiveContext(
-           (objective) -> log.debug("port {} removedFrom NextObj {} on {}",
-                                    port, nextId, deviceId),
-           (objective, error) ->
-           log.warn("port {} failed to removeFrom NextObj {} on {}: {}",
-                    port, nextId, deviceId, error));
-       NextObjective nextObjective = nextObjBuilder.
-               removeFromExisting(context);
+     * Makes a call to the FlowObjective service to remove a single bucket from
+     * a hashed group.
+     *
+     * @param port port to remove from hash group
+     * @param dstMac destination mac address of next-hop
+     * @param edgeLabel the label to use in the bucket
+     * @param nextId id for next-objective from which the bucket will be removed
+     */
+    private void removeFromHashedNextObjective(PortNumber port, MacAddress dstMac,
+                                               int edgeLabel, Integer nextId) {
+        // Create the bucket to be removed
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment
+                .builder();
+        tBuilder.setOutput(port)
+        .setEthDst(dstMac)
+        .setEthSrc(nodeMacAddr);
+        if (edgeLabel != DestinationSet.NO_EDGE_LABEL) {
+            tBuilder.pushMpls()
+            .copyTtlOut()
+            .setMpls(MplsLabel.mplsLabel(edgeLabel));
+        }
+        log.info("{} in device {}: Removing Bucket with Port {} to next object id {}",
+                 "removeFromHash", deviceId, port, nextId);
+        NextObjective.Builder nextObjBuilder = DefaultNextObjective
+                .builder()
+                .withType(NextObjective.Type.HASHED) //same as original
+                .withId(nextId)
+                .fromApp(appId)
+                .addTreatment(tBuilder.build());
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("port {} removedFrom NextObj {} on {}",
+                                         port, nextId, deviceId),
+                (objective, error) ->
+                log.warn("port {} failed to removeFrom NextObj {} on {}: {}",
+                         port, nextId, deviceId, error));
+        NextObjective nextObjective = nextObjBuilder.
+                removeFromExisting(context);
 
-       flowObjectiveService.next(deviceId, nextObjective);
-   }
+        flowObjectiveService.next(deviceId, nextObjective);
+    }
 
     /**
      * Checks all the hash-groups in the target-switch meant for the destination
@@ -468,9 +464,7 @@
                         log.info("fixHashGroup in device {}: Adding Bucket "
                                 + "with Port {} to next object id {}",
                                 deviceId, port, nextId);
-                        addToHashedNextObjective(port, dstMac,
-                                                 edgeLabel,
-                                                 nextId, false);
+                        addToHashedNextObjective(port, dstMac, edgeLabel, nextId);
                     }
                     // to update neighbor set with changes made
                     tempStore.put(dskey, Sets.union(currNeighbors, diff));
@@ -500,8 +494,18 @@
         return true;
     }
 
-
-    public boolean updateNextHops(DestinationSet ds,
+    /**
+     * Updates the DestinationSetNextObjectiveStore with any per-destination nexthops
+     * that are not already in the store for the given DestinationSet. Note that
+     * this method does not remove existing next hops for the destinations in the
+     * DestinationSet.
+     *
+     * @param ds the DestinationSet for which the next hops need to be updated
+     * @param newDstNextHops a map of per-destination next hops to update the
+     *                          destinationSet with
+     * @return true if successful in updating all next hops
+     */
+    private boolean updateNextHops(DestinationSet ds,
                                   Map<DeviceId, Set<DeviceId>> newDstNextHops) {
         DestinationSetNextObjectiveStoreKey key =
                 new DestinationSetNextObjectiveStoreKey(deviceId, ds);
@@ -538,9 +542,20 @@
         return success;
     }
 
-    private boolean updateAllPortsToNextHop(Set<DeviceId> diff, int edgeLabel,
+    /**
+     * Adds or removes buckets for all ports to a set of neighbor devices.
+     *
+     * @param neighbors set of neighbor device ids
+     * @param edgeLabel MPLS label to use in buckets
+     * @param nextId the nextObjective to change
+     * @param revoke true if buckets need to be removed, false if they need to
+     *          be added
+     * @return true if successful in adding or removing buckets for all ports
+     *                  to the neighbors
+     */
+    private boolean updateAllPortsToNextHop(Set<DeviceId> neighbors, int edgeLabel,
                                          int nextId, boolean revoke) {
-        for (DeviceId neighbor : diff) {
+        for (DeviceId neighbor : neighbors) {
             MacAddress dstMac;
             try {
                 dstMac = deviceConfig.getDeviceMac(neighbor);
@@ -570,16 +585,13 @@
                     log.debug("fixHashGroup in device {}: Adding Bucket "
                             + "with Port {} edgeLabel: {} to next object id {}",
                             deviceId, port, edgeLabel, nextId);
-                    addToHashedNextObjective(port, dstMac,
-                                             edgeLabel,
-                                             nextId, false);
+                    addToHashedNextObjective(port, dstMac, edgeLabel, nextId);
                 }
             }
         }
         return true;
     }
 
-
     /**
      * Adds or removes a port that has been configured with a vlan to a broadcast group
      * for bridging. Should only be called by the master instance for this device.
@@ -1101,46 +1113,115 @@
         }
     }*/ //XXX revisit
 
+    /**
+     * Triggers a one time bucket verification operation on all hash groups
+     * on this device.
+     */
+    public void triggerBucketCorrector() {
+        BucketCorrector bc = new BucketCorrector();
+        bc.run();
+    }
+
 
     /**
-     * RetryHashBkts is a one-time retry at populating all the buckets of a
-     * hash group based on the given link. Should only be called by the
-     * master instance of the src-device of the link.
+     *
+     *
      */
-    protected final class RetryHashBkts implements Runnable {
-        Link link;
-        MacAddress dstMac;
+    protected final class BucketCorrector implements Runnable {
+        Integer nextId;
 
-        private RetryHashBkts(Link link, MacAddress dstMac) {
-            this.link = link;
-            this.dstMac = dstMac;
+        BucketCorrector() {
+            this.nextId = null;
+        }
+
+        BucketCorrector(Integer nextId) {
+            this.nextId = nextId;
         }
 
         @Override
         public void run() {
-            log.debug("RETRY Hash buckets for linkup: {}", link);
-            Set<DestinationSetNextObjectiveStoreKey> dsKeySet = dsNextObjStore.entrySet()
-                    .stream()
-                    .filter(entry -> entry.getKey().deviceId().equals(deviceId))
-                    .filter(entry -> entry.getValue().containsNextHop(link.dst().deviceId()))
-                    .map(entry -> entry.getKey())
-                    .collect(Collectors.toSet());
-
-            log.debug("retry-link: dsNextObjStore contents for device {}: {}",
-                      deviceId, dsKeySet);
-            for (DestinationSetNextObjectiveStoreKey dsKey : dsKeySet) {
-                NextNeighbors next = dsNextObjStore.get(dsKey);
-                if (next != null) {
-                    Set<DeviceId> dstSet = next.getDstForNextHop(link.dst().deviceId());
-                    dstSet.forEach(dst -> {
-                        int edgeLabel = dsKey.destinationSet().getEdgeLabel(dst);
-                        addToHashedNextObjective(link.src().port(), dstMac, edgeLabel,
-                                                 next.nextId(), true);
-                    });
-                }
+            if (!srManager.mastershipService.isLocalMaster(deviceId)) {
+                return;
             }
+            DefaultRoutingHandler rh = srManager.getRoutingHandler();
+            if (rh == null) {
+                return;
+            }
+            if (!rh.isRoutingStable()) {
+                return;
+            }
+            rh.acquireRoutingLock();
+            try {
+                log.debug("running bucket corrector for dev: {}", deviceId);
+                Set<DestinationSetNextObjectiveStoreKey> dsKeySet = dsNextObjStore.entrySet()
+                        .stream()
+                        .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+                        .map(entry -> entry.getKey())
+                        .collect(Collectors.toSet());
+                for (DestinationSetNextObjectiveStoreKey dsKey : dsKeySet) {
+                    NextNeighbors next = dsNextObjStore.get(dsKey);
+                    if (next == null) {
+                        continue;
+                    }
+                    int nid = next.nextId();
+                    if (nextId != null && nextId != nid) {
+                        continue;
+                    }
+                    log.debug("bkt-corr: dsNextObjStore for device {}: {}",
+                              deviceId, dsKey, next);
+                    TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
+                    metabuilder.matchVlanId(INTERNAL_VLAN);
+                    NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
+                            .withId(nid)
+                            .withType(NextObjective.Type.HASHED)
+                            .withMeta(metabuilder.build())
+                            .fromApp(appId);
+
+                    next.dstNextHops().forEach((dstDev, nextHops) -> {
+                        int edgeLabel = dsKey.destinationSet().getEdgeLabel(dstDev);
+                        nextHops.forEach(neighbor -> {
+                            MacAddress neighborMac;
+                            try {
+                                neighborMac = deviceConfig.getDeviceMac(neighbor);
+                            } catch (DeviceConfigNotFoundException e) {
+                                log.warn(e.getMessage() + " Aborting neighbor"
+                                        + neighbor);
+                                return;
+                            }
+                            devicePortMap.get(neighbor).forEach(port -> {
+                                log.debug("verify in device {} nextId {}: bucket with"
+                                        + " port/label {}/{} to dst {} via {}",
+                                        deviceId, nid, port, edgeLabel,
+                                        dstDev, neighbor);
+                                nextObjBuilder.addTreatment(treatmentBuilder(port,
+                                                                neighborMac, edgeLabel));
+                            });
+                        });
+                    });
+
+                    NextObjective nextObjective = nextObjBuilder.verify();
+                    flowObjectiveService.next(deviceId, nextObjective);
+                }
+            } finally {
+                rh.releaseRoutingLock();
+            }
+
+        }
+
+        TrafficTreatment treatmentBuilder(PortNumber outport, MacAddress dstMac,
+                                          int edgeLabel) {
+            TrafficTreatment.Builder tBuilder =
+                    DefaultTrafficTreatment.builder();
+            tBuilder.setOutput(outport)
+                .setEthDst(dstMac)
+                .setEthSrc(nodeMacAddr);
+            if (edgeLabel != DestinationSet.NO_EDGE_LABEL) {
+                tBuilder.pushMpls()
+                    .copyTtlOut()
+                    .setMpls(MplsLabel.mplsLabel(edgeLabel));
+            }
+            return tBuilder.build();
         }
     }
 
-
-}
+}
\ No newline at end of file