CORD-48 Added support for broadcast next objective in OFDPA driver.
Changed groupid to show in hex for cli command 'groups'

Change-Id: I86474912a9fd775c36d5bc49545eaa58ecc46b47
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
index 1af72a8..55b556e 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
@@ -577,6 +577,7 @@
 
             ports.forEach(port -> {
                 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+                tBuilder.popVlan();
                 tBuilder.setOutput(port);
                 nextObjBuilder.addTreatment(tBuilder.build());
             });
diff --git a/cli/src/main/java/org/onosproject/cli/net/GroupsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/GroupsListCommand.java
index b10621d..8e30fa4 100644
--- a/cli/src/main/java/org/onosproject/cli/net/GroupsListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/GroupsListCommand.java
@@ -121,11 +121,11 @@
     private void printGroups(DeviceId deviceId, List<Group> groups) {
         print("deviceId=%s", deviceId);
         for (Group group : groups) {
-            print(FORMAT, group.id().id(), group.state(), group.type(),
+            print(FORMAT, Integer.toHexString(group.id().id()), group.state(), group.type(),
                   group.bytes(), group.packets(), group.appId().name());
             int i = 0;
             for (GroupBucket bucket:group.buckets().buckets()) {
-                print(BUCKET_FORMAT, group.id().id(), ++i,
+                print(BUCKET_FORMAT, Integer.toHexString(group.id().id()), ++i,
                       bucket.bytes(), bucket.packets(),
                       bucket.treatment().allInstructions());
             }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/instructions/Instructions.java b/core/api/src/main/java/org/onosproject/net/flow/instructions/Instructions.java
index 4e5d39a..126e722 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/instructions/Instructions.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/instructions/Instructions.java
@@ -630,7 +630,8 @@
         @Override
         public String toString() {
             return toStringHelper(type().toString())
-                    .add("group ID", groupId.id()).toString();
+                    .addValue("group ID=0x" + Integer.toHexString(groupId.id()))
+                    .toString();
         }
 
         @Override
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/CpqdOFDPA2Pipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/CpqdOFDPA2Pipeline.java
index 8f976da..0cb30d2 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/CpqdOFDPA2Pipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/CpqdOFDPA2Pipeline.java
@@ -18,7 +18,10 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.onlab.packet.VlanId;
 import org.onosproject.core.ApplicationId;
@@ -54,11 +57,16 @@
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
         TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
         selector.matchVlanId(vidCriterion.vlanId());
+        treatment.transition(TMAC_TABLE);
+
+        VlanId storeVlan = null;
         if (vidCriterion.vlanId() == VlanId.NONE) {
             // untagged packets are assigned vlans
             treatment.pushVlan().setVlanId(assignedVlan);
+            storeVlan = assignedVlan;
+        } else {
+            storeVlan = vidCriterion.vlanId();
         }
-        treatment.transition(TMAC_TABLE);
 
         // ofdpa cannot match on ALL portnumber, so we need to use separate
         // rules for each port.
@@ -72,7 +80,20 @@
         } else {
             portnums.add(portCriterion.port());
         }
+
         for (PortNumber pnum : portnums) {
+            // update storage
+            port2Vlan.put(pnum, storeVlan);
+            Set<PortNumber> vlanPorts = vlan2Port.get(storeVlan);
+            if (vlanPorts == null) {
+                vlanPorts = Collections.newSetFromMap(
+                                    new ConcurrentHashMap<PortNumber, Boolean>());
+                vlanPorts.add(pnum);
+                vlan2Port.put(storeVlan, vlanPorts);
+            } else {
+                vlanPorts.add(pnum);
+            }
+            // create rest of flowrule
             selector.matchInPort(pnum);
             FlowRule rule = DefaultFlowRule.builder()
                     .forDevice(deviceId)
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
index e63a404..cf3c7e8 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
@@ -23,11 +23,13 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.onlab.osgi.ServiceDirectory;
@@ -147,6 +149,7 @@
     private static final int L3UNICASTMASK = 0x20000000;
     //private static final int MPLSINTERFACEMASK = 0x90000000;
     private static final int L3ECMPMASK = 0x70000000;
+    private static final int L2FLOODMASK = 0x40000000;
 
     private final Logger log = getLogger(getClass());
     private ServiceDirectory serviceDirectory;
@@ -176,6 +179,13 @@
     private Set<IPCriterion> sentIpFilters = Collections.newSetFromMap(
                                                new ConcurrentHashMap<IPCriterion, Boolean>());
 
+    // local stores for port-vlan mapping
+    Map<PortNumber, VlanId> port2Vlan = new ConcurrentHashMap<PortNumber, VlanId>();
+    Map<VlanId, Set<PortNumber>> vlan2Port = new ConcurrentHashMap<VlanId,
+                                                        Set<PortNumber>>();
+
+
+
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
         this.serviceDirectory = context.directory();
@@ -275,26 +285,23 @@
 
     @Override
     public void next(NextObjective nextObjective) {
-        switch (nextObjective.type()) {
-        case SIMPLE:
-            Collection<TrafficTreatment> treatments = nextObjective.next();
-            if (treatments.size() != 1) {
-                log.error("Next Objectives of type Simple should only have a "
-                        + "single Traffic Treatment. Next Objective Id:{}", nextObjective.id());
-               fail(nextObjective, ObjectiveError.BADPARAMS);
-               return;
+        log.debug("Processing NextObjective id{} op{}", nextObjective.id(),
+                  nextObjective.op());
+        if (nextObjective.op() == Objective.Operation.REMOVE) {
+            if (nextObjective.next().isEmpty()) {
+                removeGroup(nextObjective);
+            } else {
+                removeBucketFromGroup(nextObjective);
             }
-            processSimpleNextObjective(nextObjective);
-            break;
-        case HASHED:
-        case BROADCAST:
-        case FAILOVER:
-            fail(nextObjective, ObjectiveError.UNSUPPORTED);
-            log.warn("Unsupported next objective type {}", nextObjective.type());
-            break;
-        default:
-            fail(nextObjective, ObjectiveError.UNKNOWN);
-            log.warn("Unknown next objective type {}", nextObjective.type());
+        } else if (nextObjective.op() == Objective.Operation.ADD) {
+            NextGroup nextGroup = flowObjectiveStore.getNextGroup(nextObjective.id());
+            if (nextGroup != null) {
+                addBucketToGroup(nextObjective);
+            } else {
+                addGroup(nextObjective);
+            }
+        } else {
+            log.warn("Unsupported operation {}", nextObjective.op());
         }
     }
 
@@ -302,6 +309,7 @@
     //  Flow handling
     //////////////////////////////////////
 
+
     /**
      * As per OFDPA 2.0 TTP, filtering of VLAN ids, MAC addresses (for routing)
      * and IP addresses configured on switch ports happen in different tables.
@@ -455,14 +463,19 @@
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
         TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
         selector.matchVlanId(vidCriterion.vlanId());
+        treatment.transition(TMAC_TABLE);
+
+        VlanId storeVlan = null;
         if (vidCriterion.vlanId() == VlanId.NONE) {
             // untagged packets are assigned vlans
             treatment.pushVlan().setVlanId(assignedVlan);
             // XXX ofdpa will require an additional vlan match on the assigned vlan
             // and it may not require the push. This is not in compliance with OF
             // standard. Waiting on what the exact flows are going to look like.
+            storeVlan = assignedVlan;
+        } else {
+            storeVlan = vidCriterion.vlanId();
         }
-        treatment.transition(TMAC_TABLE);
 
         // ofdpa cannot match on ALL portnumber, so we need to use separate
         // rules for each port.
@@ -476,7 +489,20 @@
         } else {
             portnums.add(portCriterion.port());
         }
+
         for (PortNumber pnum : portnums) {
+            // update storage
+            port2Vlan.put(pnum, storeVlan);
+            Set<PortNumber> vlanPorts = vlan2Port.get(storeVlan);
+            if (vlanPorts == null) {
+                vlanPorts = Collections.newSetFromMap(
+                                    new ConcurrentHashMap<PortNumber, Boolean>());
+                vlanPorts.add(pnum);
+                vlan2Port.put(storeVlan, vlanPorts);
+            } else {
+                vlanPorts.add(pnum);
+            }
+            // create rest of flowrule
             selector.matchInPort(pnum);
             FlowRule rule = DefaultFlowRule.builder()
                     .forDevice(deviceId)
@@ -708,10 +734,39 @@
     //  Group handling
     //////////////////////////////////////
 
+    private void addGroup(NextObjective nextObjective) {
+        switch (nextObjective.type()) {
+        case SIMPLE:
+            Collection<TrafficTreatment> treatments = nextObjective.next();
+            if (treatments.size() != 1) {
+                log.error("Next Objectives of type Simple should only have a "
+                        + "single Traffic Treatment. Next Objective Id:{}",
+                        nextObjective.id());
+               fail(nextObjective, ObjectiveError.BADPARAMS);
+               return;
+            }
+            processSimpleNextObjective(nextObjective);
+            break;
+        case BROADCAST:
+            processBroadcastNextObjective(nextObjective);
+            break;
+        case HASHED:
+            processHashedNextObjective(nextObjective);
+            break;
+        case FAILOVER:
+            fail(nextObjective, ObjectiveError.UNSUPPORTED);
+            log.warn("Unsupported next objective type {}", nextObjective.type());
+            break;
+        default:
+            fail(nextObjective, ObjectiveError.UNKNOWN);
+            log.warn("Unknown next objective type {}", nextObjective.type());
+        }
+    }
+
     /**
      * As per the OFDPA 2.0 TTP, packets are sent out of ports by using
      * a chain of groups, namely an L3 Unicast Group that points to an L2 Interface
-     * Group which in turns points to an output port. The Next Objective passed
+     * Group which in-turn points to an output port. The Next Objective passed
      * in by the application has to be broken up into a group chain
      * to satisfy this TTP.
      *
@@ -770,7 +825,9 @@
         Integer l3groupId = L3UNICASTMASK | (int) portNum;
         l3utt.group(new DefaultGroupId(l2groupId));
         GroupChainElem gce = new GroupChainElem(l3groupkey, l3groupId,
-                                                l3utt.build(), nextObj.appId());
+                                                GroupDescription.Type.INDIRECT,
+                                                Collections.singletonList(l3utt.build()),
+                                                nextObj.appId(), 1);
 
         // create object for local and distributed storage
         List<GroupKey> gkeys = new ArrayList<GroupKey>();
@@ -797,27 +854,201 @@
     }
 
     /**
+     * As per the OFDPA 2.0 TTP, packets are sent out of ports by using
+     * a chain of groups. The Next Objective passed in by the application
+     * has to be broken up into a group chain comprising of an
+     * L2 Flood group whose buckets point to L2 Interface groups.
+     *
+     * @param nextObj  the nextObjective of type BROADCAST
+     */
+    private void processBroadcastNextObjective(NextObjective nextObj) {
+        // break up broadcast next objective to multiple groups
+        Collection<TrafficTreatment> buckets = nextObj.next();
+
+        // each treatment is converted to an L2 interface group
+        int indicator = 0;
+        VlanId vlanid = null;
+        List<GroupInfo> groupInfoCollection = new ArrayList<>();
+        for (TrafficTreatment treatment : buckets) {
+            TrafficTreatment.Builder newTreatment = DefaultTrafficTreatment.builder();
+            PortNumber portNum = null;
+            // ensure that the only allowed treatments are pop-vlan and output
+            for (Instruction ins : treatment.allInstructions()) {
+                if (ins.type() == Instruction.Type.L2MODIFICATION) {
+                    L2ModificationInstruction l2ins = (L2ModificationInstruction) ins;
+                    switch (l2ins.subtype()) {
+                    case VLAN_POP:
+                        newTreatment.add(l2ins);
+                        break;
+                    default:
+                        log.debug("action {} not permitted for broadcast nextObj",
+                                  l2ins.subtype());
+                        break;
+                    }
+                } else if (ins.type() == Instruction.Type.OUTPUT) {
+                    portNum = ((OutputInstruction) ins).port();
+                    newTreatment.add(ins);
+                } else {
+                    log.debug("TrafficTreatment of type {} not permitted in "
+                            + " broadcast nextObjective", ins.type());
+                }
+            }
+
+            // also ensure that all ports are in the same vlan
+            VlanId thisvlanid = port2Vlan.get(portNum);
+            if (vlanid == null) {
+                vlanid = thisvlanid;
+            } else {
+                if (!vlanid.equals(thisvlanid)) {
+                    log.error("Driver requires all ports in a broadcast nextObj "
+                            + "to be in the same vlan. Different vlans found "
+                            + "{} and {}. Aborting group creation", vlanid, thisvlanid);
+                    return;
+                }
+            }
+
+            // assemble info for all l2 interface groups
+            indicator += GROUP1MASK;
+            int l2gk = nextObj.id() | indicator;
+            final GroupKey l2groupkey = new DefaultGroupKey(appKryo.serialize(l2gk));
+            Integer l2groupId = L2INTERFACEMASK | (vlanid.toShort() << 16) |
+                                    (int) portNum.toLong();
+            GroupBucket newbucket =
+                    DefaultGroupBucket.createIndirectGroupBucket(newTreatment.build());
+
+            // store the info needed to create this group
+            groupInfoCollection.add(new GroupInfo(l2groupId, l2groupkey, newbucket));
+        }
+
+        // assemble info for l2 flood group
+        int l2floodgk = nextObj.id() | GROUP0MASK;
+        final GroupKey l2floodgroupkey = new DefaultGroupKey(appKryo.serialize(l2floodgk));
+        Integer l2floodgroupId = L2FLOODMASK | (vlanid.toShort() << 16) | nextObj.id();
+        // collection of treatment with groupids of l2 interface groups
+        List<TrafficTreatment> floodtt = new ArrayList<>();
+        for (GroupInfo gi : groupInfoCollection) {
+            TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
+            ttb.group(new DefaultGroupId(gi.groupId));
+            floodtt.add(ttb.build());
+        }
+        GroupChainElem gce = new GroupChainElem(l2floodgroupkey, l2floodgroupId,
+                                                GroupDescription.Type.ALL,
+                                                floodtt,
+                                                nextObj.appId(),
+                                                groupInfoCollection.size());
+
+        // create objects for local and distributed storage
+        List<GroupKey> gkeys = new ArrayList<GroupKey>();
+        gkeys.add(l2floodgroupkey); // group0 in chain
+        OfdpaGroupChain ofdpaGrp = new OfdpaGroupChain(gkeys, nextObj);
+
+        // store l2floodgroupkey with the ofdpaGroupChain for the nextObjective
+        // that depends on it
+        pendingNextObjectives.put(l2floodgroupkey, ofdpaGrp);
+
+        for (GroupInfo gi : groupInfoCollection) {
+            // store all l2groupkeys with the groupChainElem for the l2floodgroup
+            // that depends on it
+            pendingGroups.put(gi.groupKey, gce);
+
+            // create and send groups for all l2 interface groups
+            GroupDescription groupDescription =
+                    new DefaultGroupDescription(
+                            deviceId,
+                            GroupDescription.Type.INDIRECT,
+                            new GroupBuckets(Collections.singletonList(gi.groupBucket)),
+                            gi.groupKey,
+                            gi.groupId,
+                            nextObj.appId());
+            groupService.addGroup(groupDescription);
+        }
+    }
+
+    private class GroupInfo {
+        private Integer groupId;
+        private GroupKey groupKey;
+        private GroupBucket groupBucket;
+
+        GroupInfo(Integer groupId, GroupKey groupKey, GroupBucket groupBucket) {
+            this.groupBucket = groupBucket;
+            this.groupId = groupId;
+            this.groupKey = groupKey;
+        }
+    }
+
+    private void processHashedNextObjective(NextObjective nextObj) {
+        // TODO Auto-generated method stub
+    }
+
+    private void addBucketToGroup(NextObjective nextObjective) {
+        // TODO Auto-generated method stub
+    }
+
+    private void removeBucketFromGroup(NextObjective nextObjective) {
+        // TODO Auto-generated method stub
+    }
+
+    private void removeGroup(NextObjective nextObjective) {
+        // TODO Auto-generated method stub
+    }
+
+    /**
      * Processes next element of a group chain. Assumption is that if this
      * group points to another group, the latter has already been created
      * and this driver has received notification for it. A second assumption is
      * that if there is another group waiting for this group then the appropriate
      * stores already have the information to act upon the notification for the
      * creating of this group.
+     * <p>
+     * The processing of the GroupChainElement depends on the number of groups
+     * this element is waiting on. For all group types other than SIMPLE, a
+     * GroupChainElement could be waiting on multiple groups.
      *
      * @param gce the group chain element to be processed next
      */
     private void processGroupChain(GroupChainElem gce) {
-        GroupBucket bucket = DefaultGroupBucket
-                .createIndirectGroupBucket(gce.getBucketActions());
-        GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
-                             GroupDescription.Type.INDIRECT,
-                             new GroupBuckets(Collections.singletonList(bucket)),
-                             gce.getGkey(),
-                             gce.getGivenGroupId(),
-                             gce.getAppId());
-        groupService.addGroup(groupDesc);
-    }
+        int waitOnGroups = gce.decrementAndGetGroupsWaitedOn();
+        if (waitOnGroups != 0) {
+            log.debug("GCE: {} waiting on {} groups. Not processing yet",
+                      gce, waitOnGroups);
+            return;
+        }
+        List<GroupBucket> buckets = new ArrayList<>();
+        switch (gce.groupType) {
+        case INDIRECT:
+            GroupBucket ibucket = DefaultGroupBucket
+                .createIndirectGroupBucket(gce.bucketActions.iterator().next());
+            buckets.add(ibucket);
+            break;
+        case ALL:
+            for (TrafficTreatment tt : gce.bucketActions) {
+                GroupBucket abucket = DefaultGroupBucket
+                        .createAllGroupBucket(tt);
+                buckets.add(abucket);
+            }
+            break;
+        case SELECT:
+            for (TrafficTreatment tt : gce.bucketActions) {
+                GroupBucket sbucket = DefaultGroupBucket
+                        .createSelectGroupBucket(tt);
+                buckets.add(sbucket);
+            }
+            break;
+        case FAILOVER:
+        default:
+            log.error("Unknown or unimplemented GroupChainElem {}", gce);
+        }
 
+        if (buckets.size() > 0) {
+            GroupDescription groupDesc = new DefaultGroupDescription(
+                                                 deviceId, gce.groupType,
+                                                 new GroupBuckets(buckets),
+                                                 gce.gkey,
+                                                 gce.givenGroupId,
+                                                 gce.appId);
+            groupService.addGroup(groupDesc);
+        }
+    }
 
     private class GroupChecker implements Runnable {
         @Override
@@ -837,7 +1068,7 @@
                     log.info("Group service processed group key {}. Processing next "
                             + "group in group chain with group key {}",
                             appKryo.deserialize(key.key()),
-                            appKryo.deserialize(gce.getGkey().key()));
+                            appKryo.deserialize(gce.gkey.key()));
                     processGroupChain(gce);
                 } else {
                     OfdpaGroupChain obj = pendingNextObjectives.getIfPresent(key);
@@ -866,7 +1097,7 @@
                     log.info("group ADDED with group key {} .. "
                             + "Processing next group in group chain with group key {}",
                             appKryo.deserialize(key.key()),
-                            appKryo.deserialize(gce.getGkey().key()));
+                            appKryo.deserialize(gce.gkey.key()));
                     processGroupChain(gce);
                 } else {
                     OfdpaGroupChain obj = pendingNextObjectives.getIfPresent(key);
@@ -890,6 +1121,11 @@
      * look like group0 --> group 1 --> outPort. Information about the groups
      * themselves can be fetched from the Group Service using the group keys from
      * objects instantiating this class.
+     *
+     * XXX Revisit this - since the forwarding objective only ever needs the
+     * groupkey of the top-level group in the group chain, why store a series
+     * of groupkeys. Also the group-chain list only works for 1-to-1 chaining,
+     * not for 1-to-many chaining.
      */
     private class OfdpaGroupChain implements NextGroup {
         private final NextObjective nextObj;
@@ -925,33 +1161,40 @@
      * preceding groups in the group chain to be created.
      */
     private class GroupChainElem {
-        private TrafficTreatment bucketActions;
+        private Collection<TrafficTreatment> bucketActions;
         private Integer givenGroupId;
+        private GroupDescription.Type groupType;
         private GroupKey gkey;
         private ApplicationId appId;
+        private AtomicInteger waitOnGroups;
 
-        public GroupChainElem(GroupKey gkey, Integer givenGroupId,
-                              TrafficTreatment tr, ApplicationId appId) {
+        GroupChainElem(GroupKey gkey, Integer givenGroupId,
+                       GroupDescription.Type groupType,
+                       Collection<TrafficTreatment> tr, ApplicationId appId,
+                       int waitOnGroups) {
             this.bucketActions = tr;
             this.givenGroupId = givenGroupId;
+            this.groupType = groupType;
             this.gkey = gkey;
             this.appId = appId;
+            this.waitOnGroups = new AtomicInteger(waitOnGroups);
         }
 
-        public TrafficTreatment getBucketActions() {
-            return bucketActions;
+        /**
+         * This methods atomically decrements the counter for the number of
+         * groups this GroupChainElement is waiting on, for notifications from
+         * the Group Service. When this method returns a value of 0, this
+         * GroupChainElement is ready to be processed.
+         *
+         * @return integer indication of the number of notifications being waited on
+         */
+        int decrementAndGetGroupsWaitedOn() {
+            return waitOnGroups.decrementAndGet();
         }
 
-        public Integer getGivenGroupId() {
-            return givenGroupId;
-        }
-
-        public GroupKey getGkey() {
-            return gkey;
-        }
-
-        public ApplicationId getAppId() {
-            return appId;
+        @Override
+        public String toString() {
+            return Integer.toHexString(givenGroupId);
         }
 
     }