Follow up for CORD-2393.

After follow up discussion with Charles we
decided that this is probably a safer approach
for handling the group checker concurrency issues.

Change-Id: Ifb9048fcd27274c5f7c1287671b3bfd0edb85ba2
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/CpqdOfdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/CpqdOfdpa2Pipeline.java
index 140513c..72b2e8a 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/CpqdOfdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/CpqdOfdpa2Pipeline.java
@@ -17,6 +17,7 @@
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import org.onlab.packet.EthType;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
@@ -76,6 +77,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.onlab.packet.IPv6.PROTOCOL_ICMP6;
 import static org.onlab.packet.MacAddress.BROADCAST;
@@ -123,7 +125,12 @@
     /**
      * Queue for passing pop vlan punt group flow rules to the GroupChecker thread.
      */
-    private Queue<FlowRule> flowRuleQueue = new ConcurrentLinkedQueue<>();
+    private Queue<FlowRule> flowRuleQueue;
+
+    /**
+     * Lock used in synchronizing driver thread with groupCheckerThread.
+     */
+    private ReentrantLock groupCheckerLock;
 
     @Override
     protected boolean requireVlanExtensions() {
@@ -176,11 +183,15 @@
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
 
-        // create a new executor at each init
-        groupChecker = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/driver",
-                "cpqd-ofdpa-%d", log));
-        groupChecker.scheduleAtFixedRate(new PopVlanPuntGroupChecker(), 100, 300, TimeUnit.MILLISECONDS);
-        super.init(deviceId, context);
+        if (supportPuntGroup()) {
+            // create a new executor at each init and a new empty queue
+            groupChecker = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/driver",
+                                                                                     "cpqd-ofdpa-%d", log));
+            flowRuleQueue = new ConcurrentLinkedQueue<>();
+            groupCheckerLock = new ReentrantLock();
+            groupChecker.scheduleAtFixedRate(new PopVlanPuntGroupChecker(), 20, 50, TimeUnit.MILLISECONDS);
+            super.init(deviceId, context);
+        }
     }
     /*
      * Cpqd emulation does not require the non OF-standard rules for
@@ -227,19 +238,27 @@
             // NOTE: Emulating OFDPA behavior by popping off internal assigned
             //       VLAN before sending to controller
             if (supportPuntGroup() && vidCriterion.vlanId() == VlanId.NONE) {
-                GroupKey groupKey = popVlanPuntGroupKey();
-                Group group = groupService.getGroup(deviceId, groupKey);
-                if (group != null) {
-                    if (!groupChecker.isShutdown()) {
-                        groupChecker.shutdown();
+                try {
+                    groupCheckerLock.lock();
+                    if (flowRuleQueue == null) {
+                        // this means that the group has been created
+                        // and that groupChecker has destroyed the queue
+                        log.debug("Installing punt table rule for untagged port {} and vlan {}.",
+                                  pnum, assignedVlan);
+                        rules.add(buildPuntTableRule(pnum, assignedVlan));
+                    } else {
+                        // The VLAN punt group may be held back due to device initial audit.
+                        // In that case, we queue all punt table flow until the group has been created.
+                        log.debug("popVlanPuntGroup not found in dev:{}, queueing this flow rule.", deviceId);
+                        flowRuleQueue.add(buildPuntTableRule(pnum, assignedVlan));
                     }
-                    rules.add(buildPuntTableRule(pnum, assignedVlan));
-                } else {
-                    // The VLAN punt group may be held back due to device initial audit.
-                    // In that case, we queue all punt table flow until the group has been created.
-                    log.info("popVlanPuntGroup not found in dev:{}, queueing this flow rule.", deviceId);
-                    flowRuleQueue.add(buildPuntTableRule(pnum, assignedVlan));
+                } finally {
+                    groupCheckerLock.unlock();
                 }
+            } else if (vidCriterion.vlanId() != VlanId.NONE) {
+                // for tagged ports just forward to the controller
+                log.debug("Installing punt rule for tagged port {} and vlan {}.", pnum, vidCriterion.vlanId());
+                rules.add(buildPuntTableRuleTagged(pnum, vidCriterion.vlanId()));
             }
 
             // create rest of flowrule
@@ -284,6 +303,30 @@
     }
 
     /**
+     * Creates punt table entry that matches IN_PORT and VLAN_VID and forwards
+     * packet to controller tagged.
+     *
+     * @param portNumber port number
+     * @param packetVlan vlan tag of the packet
+     * @return punt table flow rule
+     */
+    private FlowRule buildPuntTableRuleTagged(PortNumber portNumber, VlanId packetVlan) {
+        TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder()
+                .matchInPort(portNumber)
+                .matchVlanId(packetVlan);
+        TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder().punt();
+
+        return DefaultFlowRule.builder()
+                .forDevice(deviceId)
+                .withSelector(sbuilder.build())
+                .withTreatment(tbuilder.build())
+                .withPriority(PacketPriority.CONTROL.priorityValue())
+                .fromApp(driverId)
+                .makePermanent()
+                .forTable(PUNT_TABLE).build();
+    }
+
+    /**
      * Builds a punt to the controller rule for the arp protocol.
      * <p>
      * NOTE: CpqD cannot punt correctly in group bucket. The current impl will
@@ -805,10 +848,9 @@
         initTableMiss(MPLS_TABLE_1, ACL_TABLE, null);
         initTableMiss(BRIDGING_TABLE, ACL_TABLE, null);
         initTableMiss(ACL_TABLE, -1, null);
+        linkDiscoveryPuntTableRules();
 
         if (supportPuntGroup()) {
-            initTableMiss(PUNT_TABLE, -1,
-                    DefaultTrafficTreatment.builder().punt().build());
             initPopVlanPuntGroup();
         } else {
             initTableMiss(PUNT_TABLE, -1,
@@ -862,6 +904,51 @@
     }
 
     /**
+     * Install lldp/bbdp matching rules at table PUNT_TABLE
+     * that forward traffic to controller.
+     *
+     */
+    private void linkDiscoveryPuntTableRules() {
+        FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
+        TrafficTreatment treatment =  DefaultTrafficTreatment.builder().punt().build();
+
+        TrafficSelector.Builder lldpSelector = DefaultTrafficSelector.builder();
+        lldpSelector.matchEthType(EthType.EtherType.LLDP.ethType().toShort());
+        FlowRule lldpRule = DefaultFlowRule.builder()
+                .forDevice(deviceId)
+                .withSelector(lldpSelector.build())
+                .withTreatment(treatment)
+                .withPriority(HIGHEST_PRIORITY)
+                .fromApp(driverId)
+                .makePermanent()
+                .forTable(PUNT_TABLE).build();
+        ops =  ops.add(lldpRule);
+
+        TrafficSelector.Builder bbdpSelector = DefaultTrafficSelector.builder();
+        bbdpSelector.matchEthType(EthType.EtherType.BDDP.ethType().toShort());
+        FlowRule bbdpRule = DefaultFlowRule.builder()
+                .forDevice(deviceId)
+                .withSelector(bbdpSelector.build())
+                .withTreatment(treatment)
+                .withPriority(HIGHEST_PRIORITY)
+                .fromApp(driverId)
+                .makePermanent()
+                .forTable(PUNT_TABLE).build();
+        ops.add(bbdpRule);
+
+        flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                log.info("Added lldp/bbdp rules for table {} on {}", PUNT_TABLE, deviceId);
+            }
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                log.warn("Failed to initialize lldp/bbdp rules for table {} on {}", PUNT_TABLE, deviceId);
+            }
+        }));
+    }
+
+    /**
      * Builds a indirect group contains pop_vlan and punt actions.
      * <p>
      * Using group instead of immediate action to ensure that
@@ -900,31 +987,42 @@
     private class PopVlanPuntGroupChecker implements Runnable {
         @Override
         public void run() {
+            try {
+                groupCheckerLock.lock();
+                // this can happen outside of the lock but I think it is safer
+                // to include it here.
+                Group group = groupService.getGroup(deviceId, popVlanPuntGroupKey());
+                if (group != null) {
+                    log.debug("PopVlanPuntGroupChecker: Installing {} missing rules at punt table.",
+                              flowRuleQueue.size());
 
-            Group group = groupService.getGroup(deviceId, popVlanPuntGroupKey());
-            if (group != null) {
-                // shutdown the executor
-                if (!groupChecker.isShutdown()) {
+                    // if we have pending flow rules install them
+                    if (flowRuleQueue.size() > 0) {
+                        FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
+                        // we should not care about the context here, it can only be add
+                        // since when removing the rules the group should be there already.
+                        flowRuleQueue.forEach(ops::add);
+                        flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
+                            @Override
+                            public void onSuccess(FlowRuleOperations ops) {
+                                log.debug("Applied {} pop vlan punt rules in device {}",
+                                          ops.stages().get(0).size(), deviceId);
+                            }
+
+                            @Override
+                            public void onError(FlowRuleOperations ops) {
+                                log.error("Failed to apply all pop vlan punt rules in dev {}", deviceId);
+                            }
+                        }));
+                    }
+                    // this signifies that the group is created and now
+                    // flow rules can be installed directly
+                    flowRuleQueue = null;
+                    // shutdown the group checker gracefully
                     groupChecker.shutdown();
                 }
-                // if we have pending flow rules install them
-                if (flowRuleQueue.size() > 0) {
-                    FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
-                    // we should not care about the context here, it can only be add
-                    // since when removing the rules the group should be there already.
-                    flowRuleQueue.forEach(ops::add);
-                    flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
-                        @Override
-                        public void onSuccess(FlowRuleOperations ops) {
-                            log.debug("Applied {} pop vlan punt rules in device {}",
-                                      ops.stages().get(0).size(), deviceId);
-                        }
-                        @Override
-                        public void onError(FlowRuleOperations ops) {
-                            log.error("Failed to apply all pop vlan punt rules in dev {}", deviceId);
-                        }
-                    }));
-                }
+            } finally {
+                groupCheckerLock.unlock();
             }
         }
     }