CORD-414 Editing hash groups buckets in the OF-DPA driver instead of
creating new groups.
Also in this commit - fix for NPE in groups cli, and removal of unnecessary
cpqd-ofdpa3 driver.

Change-Id: I2a5dd183cb38ed901caa5a806791b77e9d92d93c
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
index a72eea1..4600eb8 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.driver.pipeline;
 
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.ArrayDeque;
@@ -26,6 +28,8 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -131,6 +135,11 @@
 
     protected Set<IPCriterion> sentIpFilters = Collections.newSetFromMap(
                                                new ConcurrentHashMap<>());
+    // flows installations to be retried
+    protected ScheduledExecutorService executorService
+        = newScheduledThreadPool(5, groupedThreads("OfdpaPipeliner", "retry-%d", log));
+    protected static final int MAX_RETRY_ATTEMPTS = 10;
+    protected static final int RETRY_MS = 1000;
 
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
@@ -181,30 +190,33 @@
 
     @Override
     public void forward(ForwardingObjective fwd) {
-        Collection<FlowRule> rules;
-        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
-
-        rules = processForward(fwd);
+        Collection<FlowRule> rules = processForward(fwd);
         if (rules == null || rules.isEmpty()) {
             // Assumes fail message has already been generated to the objective
             // context. Returning here prevents spurious pass message to be
             // generated by FlowRule service for empty flowOps.
             return;
         }
+        sendForward(fwd, rules);
+    }
+
+    protected void sendForward(ForwardingObjective fwd, Collection<FlowRule> rules) {
+        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
         switch (fwd.op()) {
-            case ADD:
-                rules.stream()
-                        .filter(Objects::nonNull)
-                        .forEach(flowOpsBuilder::add);
-                break;
-            case REMOVE:
-                rules.stream()
-                        .filter(Objects::nonNull)
-                        .forEach(flowOpsBuilder::remove);
-                break;
-            default:
-                fail(fwd, ObjectiveError.UNKNOWN);
-                log.warn("Unknown forwarding type {}", fwd.op());
+        case ADD:
+            rules.stream()
+            .filter(Objects::nonNull)
+            .forEach(flowOpsBuilder::add);
+            log.info("Applying a flow rule to sw:{}", deviceId);
+            break;
+        case REMOVE:
+            rules.stream()
+            .filter(Objects::nonNull)
+            .forEach(flowOpsBuilder::remove);
+            break;
+        default:
+            fail(fwd, ObjectiveError.UNKNOWN);
+            log.warn("Unknown forwarding type {}", fwd.op());
         }
 
         flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
@@ -802,24 +814,6 @@
         }
     }
 
-    private boolean isSupportedEthTypeObjective(ForwardingObjective fwd) {
-        TrafficSelector selector = fwd.selector();
-        EthTypeCriterion ethType = (EthTypeCriterion) selector
-                .getCriterion(Criterion.Type.ETH_TYPE);
-        return !((ethType == null) ||
-                ((ethType.ethType().toShort() != Ethernet.TYPE_IPV4) &&
-                        (ethType.ethType().toShort() != Ethernet.MPLS_UNICAST)));
-    }
-
-    private boolean isSupportedEthDstObjective(ForwardingObjective fwd) {
-        TrafficSelector selector = fwd.selector();
-        EthCriterion ethDst = (EthCriterion) selector
-                .getCriterion(Criterion.Type.ETH_DST);
-        VlanIdCriterion vlanId = (VlanIdCriterion) selector
-                .getCriterion(Criterion.Type.VLAN_VID);
-        return !(ethDst == null && vlanId == null);
-    }
-
     /**
      * Handles forwarding rules to the IP and MPLS tables.
      *
@@ -850,6 +844,7 @@
                 (EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
         boolean defaultRule = false;
         boolean popMpls = false;
+        boolean emptyGroup = false;
         int forTableId;
         TrafficSelector.Builder filteredSelector = DefaultTrafficSelector.builder();
         TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
@@ -967,6 +962,12 @@
                     return Collections.emptySet();
                 }
                 tb.deferred().group(group.id());
+                // check if group is empty
+                if (gkeys.size() == 1 && gkeys.get(0).size() == 1) {
+                    log.warn("Found empty group 0x{} in dev:{} .. will retry fwd:{}",
+                             Integer.toHexString(group.id().id()), deviceId, fwd.id());
+                    emptyGroup = true;
+                }
             } else {
                 log.warn("Cannot find group for nextId:{} in dev:{}. Aborting fwd:{}",
                          fwd.nextId(), deviceId, fwd.id());
@@ -1016,7 +1017,11 @@
             flowRuleCollection.add(rule.build());
             log.debug("Default rule 0.0.0.0/0 is being installed two rules");
         }
-
+        // XXX retrying flows may be necessary due to bug CORD-554
+        if (emptyGroup) {
+            executorService.schedule(new RetryFlows(fwd, flowRuleCollection),
+                                     RETRY_MS, TimeUnit.MILLISECONDS);
+        }
         return flowRuleCollection;
     }
 
@@ -1103,6 +1108,28 @@
         return rules;
     }
 
+    //////////////////////////////////////
+    //  Helper Methods and Classes
+    //////////////////////////////////////
+
+    private boolean isSupportedEthTypeObjective(ForwardingObjective fwd) {
+        TrafficSelector selector = fwd.selector();
+        EthTypeCriterion ethType = (EthTypeCriterion) selector
+                .getCriterion(Criterion.Type.ETH_TYPE);
+        return !((ethType == null) ||
+                ((ethType.ethType().toShort() != Ethernet.TYPE_IPV4) &&
+                        (ethType.ethType().toShort() != Ethernet.MPLS_UNICAST)));
+    }
+
+    private boolean isSupportedEthDstObjective(ForwardingObjective fwd) {
+        TrafficSelector selector = fwd.selector();
+        EthCriterion ethDst = (EthCriterion) selector
+                .getCriterion(Criterion.Type.ETH_DST);
+        VlanIdCriterion vlanId = (VlanIdCriterion) selector
+                .getCriterion(Criterion.Type.VLAN_VID);
+        return !(ethDst == null && vlanId == null);
+    }
+
     protected NextGroup getGroupForNextObjective(Integer nextId) {
         NextGroup next = flowObjectiveStore.getNextGroup(nextId);
         if (next != null) {
@@ -1147,7 +1174,7 @@
             }
             // add port information for last group in group-chain
             List<Instruction> lastGroupIns = new ArrayList<Instruction>();
-            if (lastGroup != null) {
+            if (lastGroup != null && !lastGroup.buckets().buckets().isEmpty()) {
                 lastGroupIns = lastGroup.buckets().buckets().get(0)
                                     .treatment().allInstructions();
             }
@@ -1189,4 +1216,30 @@
         }
         return null;
     }
+
+    /**
+     *  Utility class that retries sending flows a fixed number of times, even if
+     *  some of the attempts are successful. Used only for forwarding objectives.
+     */
+    protected final class RetryFlows implements Runnable {
+        int attempts = MAX_RETRY_ATTEMPTS;
+        private Collection<FlowRule> retryFlows;
+        private ForwardingObjective fwd;
+
+        RetryFlows(ForwardingObjective fwd, Collection<FlowRule> retryFlows) {
+            this.fwd = fwd;
+            this.retryFlows = retryFlows;
+        }
+
+        @Override
+        public void run() {
+            log.info("RETRY FLOWS ATTEMPT# {} for fwd:{} rules:{}",
+                     MAX_RETRY_ATTEMPTS - attempts, fwd.id(), retryFlows.size());
+            sendForward(fwd, retryFlows);
+            if (--attempts > 0) {
+                executorService.schedule(this, RETRY_MS, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
 }