CORD-414 Editing hash groups buckets in the OF-DPA driver instead of
creating new groups.
Also in this commit - fix for NPE in groups cli, and removal of unnecessary
cpqd-ofdpa3 driver.
Change-Id: I2a5dd183cb38ed901caa5a806791b77e9d92d93c
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/IcmpHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/IcmpHandler.java
index 22c9266..21f00df 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/IcmpHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/IcmpHandler.java
@@ -168,7 +168,7 @@
treatment, ByteBuffer.wrap(payload.serialize()));
srManager.packetService.emit(packet);
} else {
- log.info("Send a MPLS packet as a ICMP response");
+ log.debug("Send a MPLS packet as a ICMP response");
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.setOutput(outport.port())
.build();
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
index e82049b..c378541 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
@@ -256,25 +256,13 @@
nextId);
ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("LinkUp installed NextObj {} on {}",
+ (objective) -> log.debug("LinkUp addedTo NextObj {} on {}",
nextId, deviceId),
(objective, error) ->
- log.warn("LinkUp failed to install NextObj {} on {}: {}",
+ log.warn("LinkUp failed to addTo NextObj {} on {}: {}",
nextId, deviceId, error));
NextObjective nextObjective = nextObjBuilder.addToExisting(context);
flowObjectiveService.next(deviceId, nextObjective);
-
- // the addition of a bucket may actually change the neighborset
- // update the global store
- /*
- Set<DeviceId> neighbors = new HashSet<DeviceId>(ns.getDeviceIds());
- boolean newadd = neighbors.add(newLink.dst().deviceId());
- if (newadd) {
- NeighborSet nsnew = new NeighborSet(neighbors, ns.getEdgeLabel());
- nsNextObjStore.put(new NeighborSetNextObjectiveStoreKey(deviceId, nsnew),
- nextId);
- nsNextObjStore.remove(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
- }*/
} else if (isMaster) {
log.warn("linkUp in device {}, but global store has no record "
+ "for neighbor-set {}", deviceId, ns);
@@ -294,7 +282,6 @@
return;
}
- @SuppressWarnings("unused")
MacAddress dstMac;
try {
dstMac = deviceConfig.getDeviceMac(portDeviceMap.get(port));
@@ -323,13 +310,6 @@
new NeighborSetNextObjectiveStoreKey(deviceId, ns);
Integer nextId = nsNextObjStore.get(nsStoreKey);
if (nextId != null && isMaster) {
- // XXX This is a workaround for BUG (CORD-611) in current switches.
- // Should be temporary because this workaround prevents correct
- // functionality in LAG recovery.
- log.info("**portDown port:{} in device {}: Invalidating nextId {}",
- port, deviceId, nextId);
- nsNextObjStore.remove(nsStoreKey);
- /*
log.info("**portDown in device {}: Removing Bucket "
+ "with Port {} to next object id {}",
deviceId,
@@ -352,24 +332,17 @@
.withId(nextId)
.fromApp(appId)
.addTreatment(tBuilder.build());
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("portDown removedFrom NextObj {} on {}",
+ nextId, deviceId),
+ (objective, error) ->
+ log.warn("portDown failed to removeFrom NextObj {} on {}: {}",
+ nextId, deviceId, error));
NextObjective nextObjective = nextObjBuilder.
- removeFromExisting(new SRNextObjectiveContext(deviceId));
+ removeFromExisting(context);
flowObjectiveService.next(deviceId, nextObjective);
- */
- // the removal of a bucket may actually change the neighborset
- // update the global store
- /*
- Set<DeviceId> neighbors = new HashSet<DeviceId>(ns.getDeviceIds());
- boolean removed = neighbors.remove(portDeviceMap.get(port));
- if (removed) {
- NeighborSet nsnew = new NeighborSet(neighbors, ns.getEdgeLabel());
- nsNextObjStore.put(new NeighborSetNextObjectiveStoreKey(deviceId, nsnew),
- nextId);
- nsNextObjStore.remove(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
- }*/
}
-
}
devicePortMap.get(portDeviceMap.get(port)).remove(port);
diff --git a/cli/src/main/java/org/onosproject/cli/net/GroupsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/GroupsListCommand.java
index 76f2c38..8c9cfcf 100644
--- a/cli/src/main/java/org/onosproject/cli/net/GroupsListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/GroupsListCommand.java
@@ -107,8 +107,13 @@
if (state != null && !state.equals("any")) {
s = GroupState.valueOf(state.toUpperCase());
}
- Iterable<Device> devices = (uri == null) ? deviceService.getDevices() :
- Collections.singletonList(deviceService.getDevice(DeviceId.deviceId(uri)));
+ Iterable<Device> devices = deviceService.getDevices();
+ if (uri != null) {
+ Device dev = deviceService.getDevice(DeviceId.deviceId(uri));
+ if (dev != null) {
+ devices = Collections.singletonList(dev);
+ }
+ }
for (Device d : devices) {
if (s == null) {
groups = newArrayList(groupService.getGroups(d.id()));
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/CpqdOfdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/CpqdOfdpa2Pipeline.java
index ad53d71..ff2d720 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/CpqdOfdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/CpqdOfdpa2Pipeline.java
@@ -238,8 +238,8 @@
}
/*
- * Cpqd emulation does not require the non-OF standard rules for
- * matching untagged packets.
+ * Cpqd emulation does not require the non OF-standard rules for
+ * matching untagged packets that ofdpa uses.
*
* (non-Javadoc)
* @see org.onosproject.driver.pipeline.OFDPA2Pipeline#processVlanIdFilter
@@ -510,7 +510,8 @@
// we only need the top level group's key to point the flow to it
Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
if (group == null) {
- log.warn("The group left!");
+ log.warn("Group with key:{} for next-id:{} not found in dev:{}",
+ gkeys.get(0).peekFirst(), fwd.nextId(), deviceId);
fail(fwd, ObjectiveError.GROUPMISSING);
return Collections.emptySet();
}
@@ -549,7 +550,6 @@
flowRuleCollection.add(rule.build());
log.debug("Default rule 0.0.0.0/0 is being installed two rules");
}
-
return flowRuleCollection;
}
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/CpqdOfdpa3Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/CpqdOfdpa3Pipeline.java
deleted file mode 100644
index 9cf05d2..0000000
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/CpqdOfdpa3Pipeline.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.driver.pipeline;
-
-import org.onosproject.core.CoreService;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.behaviour.PipelinerContext;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.flowobjective.ForwardingObjective;
-import org.onosproject.net.group.GroupService;
-
-import java.util.Collection;
-
-/**
- * Driver for software switch emulation of the OFDPA 3.0 pipeline.
- * The software switch is the CPqD OF 1.3 switch. Unfortunately the CPqD switch
- * does not handle vlan tags and mpls labels simultaneously, which requires us
- * to do some workarounds in the driver. This driver is meant for the use of
- * the cpqd switch when MPLS is required. As a result this driver works only
- * on incoming untagged packets.
- */
-public class CpqdOfdpa3Pipeline extends CpqdOfdpa2Pipeline {
- @Override
- public void init(DeviceId deviceId, PipelinerContext context) {
- this.deviceId = deviceId;
-
- // Initialize OFDPA group handler
- groupHandler = new Ofdpa3GroupHandler();
- groupHandler.init(deviceId, context);
-
- serviceDirectory = context.directory();
- coreService = serviceDirectory.get(CoreService.class);
- flowRuleService = serviceDirectory.get(FlowRuleService.class);
- groupService = serviceDirectory.get(GroupService.class);
- flowObjectiveStore = context.store();
- deviceService = serviceDirectory.get(DeviceService.class);
-
- driverId = coreService.registerApplication(
- "org.onosproject.driver.CpqdOfdpa3Pipeline");
-
- initializePipeline();
- }
-
- @Override
- protected Collection<FlowRule> processEthTypeSpecific(ForwardingObjective fwd) {
- return processEthTypeSpecificInternal(fwd, true, ACL_TABLE);
- }
-}
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
index 0098d01..7d85f19 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
@@ -64,6 +64,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -125,7 +126,8 @@
// local store for pending bucketAdds - by design there can only be one
// pending bucket for a group
- protected ConcurrentHashMap<Integer, NextObjective> pendingBuckets = new ConcurrentHashMap<>();
+ protected ConcurrentHashMap<Integer, NextObjective> pendingBuckets =
+ new ConcurrentHashMap<>();
protected void init(DeviceId deviceId, PipelinerContext context) {
this.deviceId = deviceId;
@@ -236,9 +238,8 @@
Deque<GroupKey> gkeyChain = new ArrayDeque<>();
gkeyChain.addFirst(groupInfo.innerMostGroupDesc.appCookie());
gkeyChain.addFirst(groupInfo.nextGroupDesc.appCookie());
- OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(
- Collections.singletonList(gkeyChain),
- nextObj);
+ OfdpaNextGroup ofdpaGrp =
+ new OfdpaNextGroup(Collections.singletonList(gkeyChain), nextObj);
// store l3groupkey with the ofdpaNextGroup for the nextObjective that depends on it
updatePendingNextObjective(groupInfo.nextGroupDesc.appCookie(), ofdpaGrp);
@@ -522,7 +523,8 @@
}
}
- private List<GroupInfo> prepareL2InterfaceGroup(NextObjective nextObj, VlanId assignedVlan) {
+ private List<GroupInfo> prepareL2InterfaceGroup(NextObjective nextObj,
+ VlanId assignedVlan) {
ImmutableList.Builder<GroupInfo> groupInfoBuilder = ImmutableList.builder();
// break up broadcast next objective to multiple groups
@@ -588,12 +590,14 @@
return groupInfoBuilder.build();
}
- private void createL2FloodGroup(NextObjective nextObj, VlanId vlanId, List<GroupInfo> groupInfos) {
- // assemble info for l2 flood group
- // since there can be only one flood group for a vlan, its index is always the same - 0
+ private void createL2FloodGroup(NextObjective nextObj, VlanId vlanId,
+ List<GroupInfo> groupInfos) {
+ // assemble info for l2 flood group. Since there can be only one flood
+ // group for a vlan, its index is always the same - 0
Integer l2floodgroupId = L2_FLOOD_TYPE | (vlanId.toShort() << 16);
int l2floodgk = getNextAvailableIndex();
- final GroupKey l2floodgroupkey = new DefaultGroupKey(Ofdpa2Pipeline.appKryo.serialize(l2floodgk));
+ final GroupKey l2floodgroupkey =
+ new DefaultGroupKey(Ofdpa2Pipeline.appKryo.serialize(l2floodgk));
// collection of group buckets pointing to all the l2 interface groups
List<GroupBucket> l2floodBuckets = Lists.newArrayList();
@@ -642,7 +646,8 @@
});
}
- private void createL3MulticastGroup(NextObjective nextObj, VlanId vlanId, List<GroupInfo> groupInfos) {
+ private void createL3MulticastGroup(NextObjective nextObj, VlanId vlanId,
+ List<GroupInfo> groupInfos) {
List<GroupBucket> l3McastBuckets = new ArrayList<>();
groupInfos.forEach(groupInfo -> {
// Points to L3 interface group if there is one.
@@ -656,8 +661,10 @@
});
int l3MulticastIndex = getNextAvailableIndex();
- int l3MulticastGroupId = L3_MULTICAST_TYPE | vlanId.toShort() << 16 | (TYPE_VLAN_MASK & l3MulticastIndex);
- final GroupKey l3MulticastGroupKey = new DefaultGroupKey(Ofdpa2Pipeline.appKryo.serialize(l3MulticastIndex));
+ int l3MulticastGroupId = L3_MULTICAST_TYPE |
+ vlanId.toShort() << 16 | (TYPE_VLAN_MASK & l3MulticastIndex);
+ final GroupKey l3MulticastGroupKey =
+ new DefaultGroupKey(Ofdpa2Pipeline.appKryo.serialize(l3MulticastIndex));
GroupDescription l3MulticastGroupDesc = new DefaultGroupDescription(deviceId,
GroupDescription.Type.ALL,
@@ -891,8 +898,10 @@
/**
* Adds a bucket to the top level group of a group-chain, and creates the chain.
+ * Ensures that bucket being added is not a duplicate, by checking existing
+ * buckets for the same outport.
*
- * @param nextObjective the next group to add a bucket to
+ * @param nextObjective the bucket information for a next group
* @param next the representation of the existing group-chain for this next objective
*/
protected void addBucketToGroup(NextObjective nextObjective, NextGroup next) {
@@ -905,6 +914,30 @@
log.warn("Only one bucket can be added at a time");
return;
}
+ // first check to see if bucket being added is not a duplicate of an
+ // existing bucket. If it is for an existing outport, then its a duplicate.
+ Set<PortNumber> existingOutPorts = new HashSet<>();
+ List<Deque<GroupKey>> allActiveKeys = Ofdpa2Pipeline.appKryo.deserialize(next.data());
+ for (Deque<GroupKey> gkeys : allActiveKeys) {
+ // get the last group for the outport
+ Group glast = groupService.getGroup(deviceId, gkeys.peekLast());
+ if (glast != null && !glast.buckets().buckets().isEmpty()) {
+ PortNumber op = readOutPortFromTreatment(
+ glast.buckets().buckets().get(0).treatment());
+ if (op != null) {
+ existingOutPorts.add(op);
+ }
+ }
+ }
+ // only a single bucket being added
+ TrafficTreatment tt = nextObjective.next().iterator().next();
+ PortNumber newport = readOutPortFromTreatment(tt);
+ if (existingOutPorts.contains(newport)) {
+ log.warn("Attempt to add bucket for existing outport:{} in dev:{} for next:{}",
+ newport, deviceId, nextObjective.id());
+ return;
+ }
+
// storage for all group keys in the chain of groups created
List<Deque<GroupKey>> allGroupKeys = new ArrayList<>();
List<GroupInfo> unsentGroups = new ArrayList<>();
@@ -916,12 +949,28 @@
ttb.group(new DefaultGroupId(gi.nextGroupDesc.givenGroupId()));
GroupBucket sbucket = DefaultGroupBucket.createSelectGroupBucket(ttb.build());
- // recreate the original L3 ECMP group id and description
- int l3ecmpGroupId = L3_ECMP_TYPE | nextObjective.id() << 12;
- GroupKey l3ecmpGroupKey = new DefaultGroupKey(Ofdpa2Pipeline.appKryo.serialize(l3ecmpGroupId));
+ // retrieve the original L3 ECMP group id and description from the first
+ // element in any deque.
+ GroupKey l3ecmpGroupKey = null;
+ if (!allActiveKeys.isEmpty()) {
+ l3ecmpGroupKey = allActiveKeys.get(0).peekFirst();
+ } else {
+ log.warn("Could not determine top level group while trying to"
+ + "add bucket for port:{} in dev:{} for next:{}",
+ deviceId, nextObjective.id(), newport);
+ return;
+ }
+ Group l3ecmpGroup = groupService.getGroup(deviceId, l3ecmpGroupKey);
+ if (l3ecmpGroup == null) {
+ log.warn("Could not find l3 ecmp group while trying to add bucket"
+ + "for port:{} in dev:{} for next:{}", deviceId,
+ nextObjective.id(), newport);
+ return;
+ }
+ int l3ecmpGroupId = l3ecmpGroup.id().id();
// Although GroupDescriptions are not necessary for adding buckets to
- // existing groups, we use one in the GroupChainElem. When the latter is
+ // existing groups, we still use one in the GroupChainElem. When the latter is
// processed, the info will be extracted for the bucketAdd call to groupService
GroupDescription l3ecmpGroupDesc =
new DefaultGroupDescription(
@@ -934,14 +983,16 @@
GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc, 1, true);
// update original NextGroup with new bucket-chain
- // don't need to update pendingNextObjectives -- group already exists
+ // If active keys shows only the top-level group without a chain of groups,
+ // then it represents an empty group. Update by replacing empty chain.
Deque<GroupKey> newBucketChain = allGroupKeys.get(0);
newBucketChain.addFirst(l3ecmpGroupKey);
- List<Deque<GroupKey>> allOriginalKeys = Ofdpa2Pipeline.appKryo.deserialize(next.data());
- allOriginalKeys.add(newBucketChain);
- flowObjectiveStore.putNextGroup(nextObjective.id(),
- new OfdpaNextGroup(allOriginalKeys, nextObjective));
-
+ if (allActiveKeys.size() == 1 && allActiveKeys.get(0).size() == 1) {
+ allActiveKeys.clear();
+ }
+ allActiveKeys.add(newBucketChain);
+ updatePendingNextObjective(l3ecmpGroupKey,
+ new OfdpaNextGroup(allActiveKeys, nextObjective));
log.debug("Adding to L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
deviceId, Integer.toHexString(l3ecmpGroupId),
l3ecmpGroupKey, nextObjective.id());
@@ -958,7 +1009,7 @@
* not remove the groups in a group-chain pointed to by this bucket, as they
* may be in use (referenced by other groups) elsewhere.
*
- * @param nextObjective the next group to remove a bucket from
+ * @param nextObjective the bucket information for a next group
* @param next the representation of the existing group-chain for this next objective
*/
protected void removeBucketFromGroup(NextObjective nextObjective, NextGroup next) {
@@ -971,41 +1022,29 @@
TrafficTreatment treatment = treatments.iterator().next();
// find the bucket to remove by noting the outport, and figuring out the
// top-level group in the group-chain that indirectly references the port
- PortNumber outport = null;
- for (Instruction ins : treatment.allInstructions()) {
- if (ins instanceof Instructions.OutputInstruction) {
- outport = ((Instructions.OutputInstruction) ins).port();
- break;
- }
- }
- if (outport == null) {
- log.error("next objective {} has no outport", nextObjective.id());
+ PortNumber portToRemove = readOutPortFromTreatment(treatment);
+ if (portToRemove == null) {
+ log.warn("next objective {} has no outport.. cannot remove bucket"
+ + "from group in dev: {}", nextObjective.id(), deviceId);
return;
}
- List<Deque<GroupKey>> allgkeys = Ofdpa2Pipeline.appKryo.deserialize(next.data());
+ List<Deque<GroupKey>> allActiveKeys = Ofdpa2Pipeline.appKryo.deserialize(next.data());
Deque<GroupKey> foundChain = null;
int index = 0;
- for (Deque<GroupKey> gkeys : allgkeys) {
+ for (Deque<GroupKey> gkeys : allActiveKeys) {
+ // last group in group chain should have a single bucket pointing to port
GroupKey groupWithPort = gkeys.peekLast();
Group group = groupService.getGroup(deviceId, groupWithPort);
if (group == null) {
- log.warn("Inconsistent group chain");
+ log.warn("Inconsistent group chain found when removing bucket"
+ + "for next:{} in dev:{}", nextObjective.id(), deviceId);
continue;
}
- // last group in group chain should have a single bucket pointing to port
- List<Instruction> lastIns = group.buckets().buckets().iterator()
- .next().treatment().allInstructions();
- for (Instruction i : lastIns) {
- if (i instanceof Instructions.OutputInstruction) {
- PortNumber lastport = ((Instructions.OutputInstruction) i).port();
- if (lastport.equals(outport)) {
- foundChain = gkeys;
- break;
- }
- }
- }
- if (foundChain != null) {
+ PortNumber pout = readOutPortFromTreatment(
+ group.buckets().buckets().get(0).treatment());
+ if (pout.equals(portToRemove)) {
+ foundChain = gkeys;
break;
}
index++;
@@ -1030,15 +1069,24 @@
.build());
GroupBuckets removeBuckets = new GroupBuckets(Collections
.singletonList(bucket));
- log.debug("Removing buckets from group id {} for next id {} in device {}",
- modGroup.id(), nextObjective.id(), deviceId);
+ log.debug("Removing buckets from group id 0x{} pointing to group id 0x{}"
+ + "for next id {} in device {}", Integer.toHexString(modGroup.id().id()),
+ Integer.toHexString(pointedGroup.id().id()), nextObjective.id(), deviceId);
groupService.removeBucketsFromGroup(deviceId, modGroupKey,
removeBuckets, modGroupKey,
nextObjective.appId());
- //update store
- allgkeys.remove(index);
+ // update store
+ // If the bucket removed was the last bucket in the group, then
+ // retain an entry for the top level group which still exists.
+ if (allActiveKeys.size() == 1) {
+ ArrayDeque<GroupKey> top = new ArrayDeque<>();
+ top.add(modGroupKey);
+ allActiveKeys.add(top);
+ }
+ allActiveKeys.remove(index);
flowObjectiveStore.putNextGroup(nextObjective.id(),
- new OfdpaNextGroup(allgkeys, nextObjective));
+ new OfdpaNextGroup(allActiveKeys,
+ nextObjective));
} else {
log.warn("Could not find appropriate group-chain for removing bucket"
+ " for next id {} in dev:{}", nextObjective.id(), deviceId);
@@ -1054,13 +1102,13 @@
* this next objective
*/
protected void removeGroup(NextObjective nextObjective, NextGroup next) {
- List<Deque<GroupKey>> allgkeys = Ofdpa2Pipeline.appKryo.deserialize(next.data());
+ List<Deque<GroupKey>> allActiveKeys = Ofdpa2Pipeline.appKryo.deserialize(next.data());
- List<GroupKey> groupKeys = allgkeys.stream()
+ List<GroupKey> groupKeys = allActiveKeys.stream()
.map(Deque::getFirst).collect(Collectors.toList());
pendingRemoveNextObjectives.put(nextObjective, groupKeys);
- allgkeys.forEach(groupChain -> groupChain.forEach(groupKey ->
+ allActiveKeys.forEach(groupChain -> groupChain.forEach(groupKey ->
groupService.removeGroup(deviceId, groupKey, nextObjective.appId())));
flowObjectiveStore.removeNextGroup(nextObjective.id());
}
@@ -1207,6 +1255,21 @@
}
/**
+ * Returns the outport in a traffic treatment.
+ *
+ * @param tt the treatment
+ * @return the PortNumber for the outport or null
+ */
+ protected static PortNumber readOutPortFromTreatment(TrafficTreatment tt) {
+ for (Instruction ins : tt.allInstructions()) {
+ if (ins.type() == Instruction.Type.OUTPUT) {
+ return ((Instructions.OutputInstruction) ins).port();
+ }
+ }
+ return null;
+ }
+
+ /**
* Returns a hash as the L2 Interface Group Key.
*
* Keep the lower 6-bit for port since port number usually smaller than 64.
@@ -1264,7 +1327,7 @@
* top level ECMP group, while every other element represents a unique groupKey.
* <p>
* Also includes information about the next objective that
- * resulted in this group-chain.
+ * resulted in these group-chains.
*
*/
protected class OfdpaNextGroup implements NextGroup {
@@ -1272,18 +1335,18 @@
private final List<Deque<GroupKey>> gkeys;
public OfdpaNextGroup(List<Deque<GroupKey>> gkeys, NextObjective nextObj) {
- this.gkeys = gkeys;
this.nextObj = nextObj;
- }
-
- public List<Deque<GroupKey>> groupKey() {
- return gkeys;
+ this.gkeys = gkeys;
}
public NextObjective nextObjective() {
return nextObj;
}
+ public List<Deque<GroupKey>> groupKeys() {
+ return gkeys;
+ }
+
@Override
public byte[] data() {
return Ofdpa2Pipeline.appKryo.serialize(gkeys);
@@ -1295,7 +1358,7 @@
* Stores enough information to create a Group Description to add the group
* to the switch by requesting the Group Service. Objects instantiating this
* class are meant to be temporary and live as long as it is needed to wait for
- * preceding groups in the group chain to be created.
+ * referenced groups in the group chain to be created.
*/
protected class GroupChainElem {
private GroupDescription groupDescription;
@@ -1310,7 +1373,7 @@
}
/**
- * This methods atomically decrements the counter for the number of
+ * This method atomically decrements the counter for the number of
* groups this GroupChainElement is waiting on, for notifications from
* the Group Service. When this method returns a value of 0, this
* GroupChainElement is ready to be processed.
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
index a72eea1..4600eb8 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.driver.pipeline;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayDeque;
@@ -26,6 +28,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -131,6 +135,11 @@
protected Set<IPCriterion> sentIpFilters = Collections.newSetFromMap(
new ConcurrentHashMap<>());
+ // flows installations to be retried
+ protected ScheduledExecutorService executorService
+ = newScheduledThreadPool(5, groupedThreads("OfdpaPipeliner", "retry-%d", log));
+ protected static final int MAX_RETRY_ATTEMPTS = 10;
+ protected static final int RETRY_MS = 1000;
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
@@ -181,30 +190,33 @@
@Override
public void forward(ForwardingObjective fwd) {
- Collection<FlowRule> rules;
- FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
-
- rules = processForward(fwd);
+ Collection<FlowRule> rules = processForward(fwd);
if (rules == null || rules.isEmpty()) {
// Assumes fail message has already been generated to the objective
// context. Returning here prevents spurious pass message to be
// generated by FlowRule service for empty flowOps.
return;
}
+ sendForward(fwd, rules);
+ }
+
+ protected void sendForward(ForwardingObjective fwd, Collection<FlowRule> rules) {
+ FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
switch (fwd.op()) {
- case ADD:
- rules.stream()
- .filter(Objects::nonNull)
- .forEach(flowOpsBuilder::add);
- break;
- case REMOVE:
- rules.stream()
- .filter(Objects::nonNull)
- .forEach(flowOpsBuilder::remove);
- break;
- default:
- fail(fwd, ObjectiveError.UNKNOWN);
- log.warn("Unknown forwarding type {}", fwd.op());
+ case ADD:
+ rules.stream()
+ .filter(Objects::nonNull)
+ .forEach(flowOpsBuilder::add);
+ log.info("Applying a flow rule to sw:{}", deviceId);
+ break;
+ case REMOVE:
+ rules.stream()
+ .filter(Objects::nonNull)
+ .forEach(flowOpsBuilder::remove);
+ break;
+ default:
+ fail(fwd, ObjectiveError.UNKNOWN);
+ log.warn("Unknown forwarding type {}", fwd.op());
}
flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
@@ -802,24 +814,6 @@
}
}
- private boolean isSupportedEthTypeObjective(ForwardingObjective fwd) {
- TrafficSelector selector = fwd.selector();
- EthTypeCriterion ethType = (EthTypeCriterion) selector
- .getCriterion(Criterion.Type.ETH_TYPE);
- return !((ethType == null) ||
- ((ethType.ethType().toShort() != Ethernet.TYPE_IPV4) &&
- (ethType.ethType().toShort() != Ethernet.MPLS_UNICAST)));
- }
-
- private boolean isSupportedEthDstObjective(ForwardingObjective fwd) {
- TrafficSelector selector = fwd.selector();
- EthCriterion ethDst = (EthCriterion) selector
- .getCriterion(Criterion.Type.ETH_DST);
- VlanIdCriterion vlanId = (VlanIdCriterion) selector
- .getCriterion(Criterion.Type.VLAN_VID);
- return !(ethDst == null && vlanId == null);
- }
-
/**
* Handles forwarding rules to the IP and MPLS tables.
*
@@ -850,6 +844,7 @@
(EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
boolean defaultRule = false;
boolean popMpls = false;
+ boolean emptyGroup = false;
int forTableId;
TrafficSelector.Builder filteredSelector = DefaultTrafficSelector.builder();
TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
@@ -967,6 +962,12 @@
return Collections.emptySet();
}
tb.deferred().group(group.id());
+ // check if group is empty
+ if (gkeys.size() == 1 && gkeys.get(0).size() == 1) {
+ log.warn("Found empty group 0x{} in dev:{} .. will retry fwd:{}",
+ Integer.toHexString(group.id().id()), deviceId, fwd.id());
+ emptyGroup = true;
+ }
} else {
log.warn("Cannot find group for nextId:{} in dev:{}. Aborting fwd:{}",
fwd.nextId(), deviceId, fwd.id());
@@ -1016,7 +1017,11 @@
flowRuleCollection.add(rule.build());
log.debug("Default rule 0.0.0.0/0 is being installed two rules");
}
-
+ // XXX retrying flows may be necessary due to bug CORD-554
+ if (emptyGroup) {
+ executorService.schedule(new RetryFlows(fwd, flowRuleCollection),
+ RETRY_MS, TimeUnit.MILLISECONDS);
+ }
return flowRuleCollection;
}
@@ -1103,6 +1108,28 @@
return rules;
}
+ //////////////////////////////////////
+ // Helper Methods and Classes
+ //////////////////////////////////////
+
+ private boolean isSupportedEthTypeObjective(ForwardingObjective fwd) {
+ TrafficSelector selector = fwd.selector();
+ EthTypeCriterion ethType = (EthTypeCriterion) selector
+ .getCriterion(Criterion.Type.ETH_TYPE);
+ return !((ethType == null) ||
+ ((ethType.ethType().toShort() != Ethernet.TYPE_IPV4) &&
+ (ethType.ethType().toShort() != Ethernet.MPLS_UNICAST)));
+ }
+
+ private boolean isSupportedEthDstObjective(ForwardingObjective fwd) {
+ TrafficSelector selector = fwd.selector();
+ EthCriterion ethDst = (EthCriterion) selector
+ .getCriterion(Criterion.Type.ETH_DST);
+ VlanIdCriterion vlanId = (VlanIdCriterion) selector
+ .getCriterion(Criterion.Type.VLAN_VID);
+ return !(ethDst == null && vlanId == null);
+ }
+
protected NextGroup getGroupForNextObjective(Integer nextId) {
NextGroup next = flowObjectiveStore.getNextGroup(nextId);
if (next != null) {
@@ -1147,7 +1174,7 @@
}
// add port information for last group in group-chain
List<Instruction> lastGroupIns = new ArrayList<Instruction>();
- if (lastGroup != null) {
+ if (lastGroup != null && !lastGroup.buckets().buckets().isEmpty()) {
lastGroupIns = lastGroup.buckets().buckets().get(0)
.treatment().allInstructions();
}
@@ -1189,4 +1216,30 @@
}
return null;
}
+
+ /**
+ * Utility class that retries sending flows a fixed number of times, even if
+ * some of the attempts are successful. Used only for forwarding objectives.
+ */
+ protected final class RetryFlows implements Runnable {
+ int attempts = MAX_RETRY_ATTEMPTS;
+ private Collection<FlowRule> retryFlows;
+ private ForwardingObjective fwd;
+
+ RetryFlows(ForwardingObjective fwd, Collection<FlowRule> retryFlows) {
+ this.fwd = fwd;
+ this.retryFlows = retryFlows;
+ }
+
+ @Override
+ public void run() {
+ log.info("RETRY FLOWS ATTEMPT# {} for fwd:{} rules:{}",
+ MAX_RETRY_ATTEMPTS - attempts, fwd.id(), retryFlows.size());
+ sendForward(fwd, retryFlows);
+ if (--attempts > 0) {
+ executorService.schedule(this, RETRY_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
}
diff --git a/drivers/default/src/main/resources/onos-drivers.xml b/drivers/default/src/main/resources/onos-drivers.xml
index b51f087..78894e6 100644
--- a/drivers/default/src/main/resources/onos-drivers.xml
+++ b/drivers/default/src/main/resources/onos-drivers.xml
@@ -116,16 +116,6 @@
<behaviour api="org.onosproject.net.behaviour.Pipeliner"
impl="org.onosproject.driver.pipeline.CpqdOfdpa2VlanPipeline"/>
</driver>
- <!-- Emulation of the ofdpa pipeline using a CPqD OF 1.3 software switch.
- ~ Use this driver when MPLS functionality is required.
- ~ To use this driver, configure ONOS with the dpid of the device.
- -->
- <driver name="ofdpa3-cpqd" extends="default"
- manufacturer="ONF"
- hwVersion="OF1.3 Software Switch from CPqD" swVersion="for Group Chaining">
- <behaviour api="org.onosproject.net.behaviour.Pipeliner"
- impl="org.onosproject.driver.pipeline.CpqdOfdpa3Pipeline"/>
- </driver>
<driver name="celestica" extends="default"
manufacturer="PMC GPON Networks" hwVersion="PAS5211 v2" swVersion="vOLT version 1.5.3.*">