CORD-354 OF-DPA support for link-failures.
Bug fix in flowObjectives store. Adding a removeNextGroup API to the store.
Change-Id: I5890411e5b4eabdc057402687ada26e539500f8f
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 62722f0..7bcdfeb 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -597,11 +597,20 @@
} else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
- if (deviceService.isAvailable(((Device) event.subject()).id())) {
+ DeviceId deviceId = ((Device) event.subject()).id();
+ if (deviceService.isAvailable(deviceId)) {
log.info("Processing device event {} for available device {}",
event.type(), ((Device) event.subject()).id());
processDeviceAdded((Device) event.subject());
- }
+ } /* else {
+ if (event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED) {
+ // availability changed and not available - dev gone
+ DefaultGroupHandler groupHandler = groupHandlerMap.get(deviceId);
+ if (groupHandler != null) {
+ groupHandler.removeAllGroups();
+ }
+ }
+ }*/
} else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
processPortRemoved((Device) event.subject(),
((DeviceEvent) event).port());
@@ -655,7 +664,8 @@
log.debug("A link {} was removed", link.toString());
DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src().deviceId());
if (groupHandler != null) {
- groupHandler.portDown(link.src().port());
+ groupHandler.portDown(link.src().port(),
+ mastershipService.isLocalMaster(link.src().deviceId()));
}
log.trace("Starting optimized route population process");
defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
@@ -711,7 +721,8 @@
log.debug("Port {} was removed", port.toString());
DefaultGroupHandler groupHandler = groupHandlerMap.get(device.id());
if (groupHandler != null) {
- groupHandler.portDown(port.number());
+ groupHandler.portDown(port.number(),
+ mastershipService.isLocalMaster(device.id()));
}
}
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
index bc394b8..2986c50 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/grouphandler/DefaultGroupHandler.java
@@ -32,11 +32,13 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.MplsLabel;
+import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
@@ -49,6 +51,7 @@
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.link.LinkService;
+import org.onosproject.segmentrouting.SegmentRoutingManager;
import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
import org.onosproject.segmentrouting.config.DeviceProperties;
import org.onosproject.store.service.EventuallyConsistentMap;
@@ -71,9 +74,11 @@
protected MacAddress nodeMacAddr = null;
protected LinkService linkService;
protected FlowObjectiveService flowObjectiveService;
-
+ // local store for neighbor-device-ids and the set of ports on this device
+ // that connect to the same neighbor
protected ConcurrentHashMap<DeviceId, Set<PortNumber>> devicePortMap =
new ConcurrentHashMap<>();
+ //local store for ports on this device connected to neighbor-device-id
protected ConcurrentHashMap<PortNumber, DeviceId> portDeviceMap =
new ConcurrentHashMap<>();
protected EventuallyConsistentMap<
@@ -225,26 +230,33 @@
deviceId,
nsSet);
for (NeighborSet ns : nsSet) {
- // Create the new bucket to be updated
- TrafficTreatment.Builder tBuilder =
- DefaultTrafficTreatment.builder();
- tBuilder.setOutput(newLink.src().port())
- .setEthDst(dstMac)
- .setEthSrc(nodeMacAddr);
- if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
- tBuilder.pushMpls()
- .copyTtlOut()
- .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
- }
-
Integer nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
if (nextId != null && isMaster) {
- NextObjective.Builder nextObjBuilder = DefaultNextObjective
- .builder().withId(nextId)
- .withType(NextObjective.Type.HASHED).fromApp(appId);
+ // Create the new bucket to be updated
+ TrafficTreatment.Builder tBuilder =
+ DefaultTrafficTreatment.builder();
+ tBuilder.setOutput(newLink.src().port())
+ .setEthDst(dstMac)
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .copyTtlOut()
+ .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ }
+ // setup metadata to pass to nextObjective - indicate the vlan on egress
+ // if needed by the switch pipeline. Since hashed next-hops are always to
+ // other neighboring routers, there is no subnet assigned on those ports.
+ TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
+ metabuilder.matchVlanId(
+ VlanId.vlanId(SegmentRoutingManager.ASSIGNED_VLAN_NO_SUBNET));
- nextObjBuilder.addTreatment(tBuilder.build());
+ NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
+ .withId(nextId)
+ .withType(NextObjective.Type.HASHED)
+ .addTreatment(tBuilder.build())
+ .withMeta(metabuilder.build())
+ .fromApp(appId);
log.info("**linkUp in device {}: Adding Bucket "
+ "with Port {} to next object id {}",
deviceId,
@@ -253,6 +265,18 @@
NextObjective nextObjective = nextObjBuilder.
addToExisting(new SRNextObjectiveContext(deviceId));
flowObjectiveService.next(deviceId, nextObjective);
+
+ // the addition of a bucket may actually change the neighborset
+ // update the global store
+ /*
+ Set<DeviceId> neighbors = new HashSet<DeviceId>(ns.getDeviceIds());
+ boolean newadd = neighbors.add(newLink.dst().deviceId());
+ if (newadd) {
+ NeighborSet nsnew = new NeighborSet(neighbors, ns.getEdgeLabel());
+ nsNextObjStore.put(new NeighborSetNextObjectiveStoreKey(deviceId, nsnew),
+ nextId);
+ nsNextObjStore.remove(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
+ }*/
} else if (isMaster) {
log.warn("linkUp in device {}, but global store has no record "
+ "for neighbor-set {}", deviceId, ns);
@@ -265,7 +289,7 @@
*
* @param port port number that has gone down
*/
- public void portDown(PortNumber port) {
+ public void portDown(PortNumber port, boolean isMaster) {
if (portDeviceMap.get(port) == null) {
log.warn("portDown: unknown port");
return;
@@ -292,40 +316,50 @@
.filter((ns) -> (ns.getDeviceIds()
.contains(portDeviceMap.get(port))))
.collect(Collectors.toSet());
- log.trace("portDown: nsNextObjStore contents for device {}:",
- deviceId,
- nsSet);
+ log.debug("portDown: nsNextObjStore contents for device {}:{}",
+ deviceId, nsSet);
for (NeighborSet ns : nsSet) {
- // Create the bucket to be removed
- TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment
- .builder();
- tBuilder.setOutput(port)
- .setEthDst(dstMac)
- .setEthSrc(nodeMacAddr);
- if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
- tBuilder.pushMpls()
- .copyTtlOut()
- .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
- }
-
Integer nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
- if (nextId != null) {
- NextObjective.Builder nextObjBuilder = DefaultNextObjective
- .builder().withType(NextObjective.Type.SIMPLE).withId(nextId).fromApp(appId);
-
- nextObjBuilder.addTreatment(tBuilder.build());
-
+ if (nextId != null && isMaster) {
log.info("**portDown in device {}: Removing Bucket "
+ "with Port {} to next object id {}",
deviceId,
port,
nextId);
- // should do removefromexisting and only if master
- /*NextObjective nextObjective = nextObjBuilder.
- remove(new SRNextObjectiveContext(deviceId));
+ // Create the bucket to be removed
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment
+ .builder();
+ tBuilder.setOutput(port)
+ .setEthDst(dstMac)
+ .setEthSrc(nodeMacAddr);
+ if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
+ tBuilder.pushMpls()
+ .copyTtlOut()
+ .setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
+ }
+ NextObjective.Builder nextObjBuilder = DefaultNextObjective
+ .builder()
+ .withType(NextObjective.Type.HASHED) //same as original
+ .withId(nextId)
+ .fromApp(appId)
+ .addTreatment(tBuilder.build());
+ NextObjective nextObjective = nextObjBuilder.
+ removeFromExisting(new SRNextObjectiveContext(deviceId));
- flowObjectiveService.next(deviceId, nextObjective);*/
+ flowObjectiveService.next(deviceId, nextObjective);
+
+ // the removal of a bucket may actually change the neighborset
+ // update the global store
+ /*
+ Set<DeviceId> neighbors = new HashSet<DeviceId>(ns.getDeviceIds());
+ boolean removed = neighbors.remove(portDeviceMap.get(port));
+ if (removed) {
+ NeighborSet nsnew = new NeighborSet(neighbors, ns.getEdgeLabel());
+ nsNextObjStore.put(new NeighborSetNextObjectiveStoreKey(deviceId, nsnew),
+ nextId);
+ nsNextObjStore.remove(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
+ }*/
}
}
@@ -718,6 +752,22 @@
return false;
}
+ public void removeAllGroups() {
+ for (Map.Entry<NeighborSetNextObjectiveStoreKey, Integer> entry:
+ nsNextObjStore.entrySet()) {
+ removeGroup(entry.getValue());
+ }
+ for (Map.Entry<PortNextObjectiveStoreKey, Integer> entry:
+ portNextObjStore.entrySet()) {
+ removeGroup(entry.getValue());
+ }
+ for (Map.Entry<SubnetNextObjectiveStoreKey, Integer> entry:
+ subnetNextObjStore.entrySet()) {
+ removeGroup(entry.getValue());
+ }
+ // should probably clean local stores port-neighbor
+ }
+
protected static class SRNextObjectiveContext implements ObjectiveContext {
final DeviceId deviceId;
diff --git a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java
index ecf5d73..85dec0f 100644
--- a/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java
+++ b/core/api/src/main/java/org/onosproject/net/flowobjective/FlowObjectiveStore.java
@@ -27,7 +27,8 @@
extends Store<ObjectiveEvent, FlowObjectiveStoreDelegate> {
/**
- * Adds a NextGroup to the store.
+ * Adds a NextGroup to the store, by mapping it to the nextId as key,
+ * and replacing any previous mapping.
*
* @param nextId an integer
* @param group a next group opaque object
@@ -36,12 +37,22 @@
/**
* Fetch a next group from the store.
- * @param nextId an integer
- * @return a next group
+ *
+ * @param nextId an integer used as key
+ * @return a next group, or null if group was not found
*/
NextGroup getNextGroup(Integer nextId);
/**
+ * Remove a next group mapping from the store.
+ *
+ * @param nextId the key to remove from the store.
+ * @return the next group which mapped to the nextId and is now removed, or
+ * null if no group mapping existed in the store
+ */
+ NextGroup removeNextGroup(Integer nextId);
+
+ /**
* Allocates a next objective id. This id is globally unique
*
* @return an integer
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 33200b1..6a0d3e1 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -48,6 +48,7 @@
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.net.flowobjective.ObjectiveEvent.Type;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -381,19 +382,19 @@
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
@Override
public void notify(ObjectiveEvent event) {
- log.debug("Received notification of obj event {}", event);
- Set<PendingNext> pending = pendingForwards.remove(event.subject());
+ if (event.type() == Type.ADD) {
+ log.debug("Received notification of obj event {}", event);
+ Set<PendingNext> pending = pendingForwards.remove(event.subject());
- if (pending == null) {
- log.debug("Nothing pending for this obj event");
- return;
+ if (pending == null) {
+ log.debug("Nothing pending for this obj event");
+ return;
+ }
+
+ log.debug("Processing pending forwarding objectives {}", pending.size());
+ pending.forEach(p -> getDevicePipeliner(p.deviceId())
+ .forward(p.forwardingObjective()));
}
-
- log.debug("Processing pending forwarding objectives {}", pending.size());
-
- pending.forEach(p -> getDevicePipeliner(p.deviceId())
- .forward(p.forwardingObjective()));
-
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
index e8ea24f..87b1058 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flowobjective/impl/DistributedFlowObjectiveStore.java
@@ -79,10 +79,9 @@
log.info("Stopped");
}
-
@Override
public void putNextGroup(Integer nextId, NextGroup group) {
- nextGroups.putIfAbsent(nextId, group.data());
+ nextGroups.put(nextId, group.data());
notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.ADD, nextId));
}
@@ -96,6 +95,16 @@
}
@Override
+ public NextGroup removeNextGroup(Integer nextId) {
+ Versioned<byte[]> versionGroup = nextGroups.remove(nextId);
+ if (versionGroup != null) {
+ notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, nextId));
+ return new DefaultNextGroup(versionGroup.value());
+ }
+ return null;
+ }
+
+ @Override
public int allocateNextId() {
return (int) nextIds.incrementAndGet();
}
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/CpqdOFDPA2Pipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/CpqdOFDPA2Pipeline.java
index 82aa55e..8fd9097 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/CpqdOFDPA2Pipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/CpqdOFDPA2Pipeline.java
@@ -71,7 +71,6 @@
* (non-Javadoc)
* @see org.onosproject.driver.pipeline.OFDPA2Pipeline#processVlanIdFilter
*/
-
@Override
protected List<FlowRule> processVlanIdFilter(PortCriterion portCriterion,
VlanIdCriterion vidCriterion,
@@ -267,16 +266,18 @@
}
if (fwd.nextId() != null) {
- NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
- List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
- // we only need the top level group's key to point the flow to it
- Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
- if (group == null) {
- log.warn("The group left!");
- fail(fwd, ObjectiveError.GROUPMISSING);
- return Collections.emptySet();
+ NextGroup next = getGroupForNextObjective(fwd.nextId());
+ if (next != null) {
+ List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
+ // we only need the top level group's key to point the flow to it
+ Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
+ if (group == null) {
+ log.warn("The group left!");
+ fail(fwd, ObjectiveError.GROUPMISSING);
+ return Collections.emptySet();
+ }
+ tb.deferred().group(group.id());
}
- tb.deferred().group(group.id());
}
tb.transition(ACL_TABLE);
FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
index c2b07cd..cb1a650 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/OFDPA2Pipeline.java
@@ -94,7 +94,6 @@
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupListener;
import org.onosproject.net.group.GroupService;
-import org.onosproject.net.packet.PacketService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
@@ -149,7 +148,6 @@
protected FlowObjectiveStore flowObjectiveStore;
protected DeviceId deviceId;
protected ApplicationId driverId;
- protected PacketService packetService;
protected DeviceService deviceService;
protected KryoNamespace appKryo = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
@@ -174,6 +172,10 @@
Map<VlanId, Set<PortNumber>> vlan2Port = new ConcurrentHashMap<VlanId,
Set<PortNumber>>();
+ // local store for pending bucketAdds - by design there can only be one
+ // pending bucket for a group
+ ConcurrentHashMap<Integer, NextObjective> pendingBuckets = new ConcurrentHashMap<>();
+
// index number for group creation
AtomicInteger l3vpnindex = new AtomicInteger(0);
@@ -202,7 +204,6 @@
flowRuleService = serviceDirectory.get(FlowRuleService.class);
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
- packetService = serviceDirectory.get(PacketService.class);
deviceService = serviceDirectory.get(DeviceService.class);
groupService.addListener(new InnerGroupListener());
@@ -293,10 +294,13 @@
if (nextGroup != null) {
log.debug("Processing NextObjective id{} in dev{} - add bucket",
nextObjective.id(), deviceId);
- addBucketToGroup(nextObjective);
+ addBucketToGroup(nextObjective, nextGroup);
} else {
// it is possible that group-chain has not been fully created yet
- waitToAddBucketToGroup(nextObjective);
+ log.debug("Waiting to add bucket to group for next-id:{} in dev:{}",
+ nextObjective.id(), deviceId);
+ // by design only one pending bucket is allowed for the group
+ pendingBuckets.put(nextObjective.id(), nextObjective);
}
break;
case REMOVE:
@@ -307,7 +311,7 @@
}
log.debug("Processing NextObjective id{} in dev{} - remove group",
nextObjective.id(), deviceId);
- removeGroup(nextObjective);
+ removeGroup(nextObjective, nextGroup);
break;
case REMOVE_FROM_EXISTING:
if (nextGroup == null) {
@@ -317,7 +321,7 @@
}
log.debug("Processing NextObjective id{} in dev{} - remove bucket",
nextObjective.id(), deviceId);
- removeBucketFromGroup(nextObjective);
+ removeBucketFromGroup(nextObjective, nextGroup);
break;
default:
log.warn("Unsupported operation {}", nextObjective.op());
@@ -791,17 +795,19 @@
return Collections.emptySet();
}
- NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
- List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
- // we only need the top level group's key to point the flow to it
- Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
- if (group == null) {
- log.warn("Group with key:{} for next-id:{} not found in dev:{}",
- gkeys.get(0).peekFirst(), fwd.nextId(), deviceId);
- fail(fwd, ObjectiveError.GROUPMISSING);
- return Collections.emptySet();
+ NextGroup next = getGroupForNextObjective(fwd.nextId());
+ if (next != null) {
+ List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
+ // we only need the top level group's key to point the flow to it
+ Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
+ if (group == null) {
+ log.warn("Group with key:{} for next-id:{} not found in dev:{}",
+ gkeys.get(0).peekFirst(), fwd.nextId(), deviceId);
+ fail(fwd, ObjectiveError.GROUPMISSING);
+ return Collections.emptySet();
+ }
+ tb.deferred().group(group.id());
}
- tb.deferred().group(group.id());
}
tb.transition(ACL_TABLE);
FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
@@ -868,7 +874,7 @@
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
if (fwd.nextId() != null) {
- NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
+ NextGroup next = getGroupForNextObjective(fwd.nextId());
if (next != null) {
List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
// we only need the top level group's key to point the flow to it
@@ -903,6 +909,23 @@
return rules;
}
+ protected NextGroup getGroupForNextObjective(Integer nextId) {
+ NextGroup next = flowObjectiveStore.getNextGroup(nextId);
+ if (next != null) {
+ List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
+ if (gkeys != null && !gkeys.isEmpty()) {
+ return next;
+ } else {
+ log.warn("Empty next group found in FlowObjective store for "
+ + "next-id:{} in dev:{}", nextId, deviceId);
+ }
+ } else {
+ log.warn("next-id {} not found in Flow objective store for dev:{}",
+ nextId, deviceId);
+ }
+ return null;
+ }
+
private void pass(Objective obj) {
if (obj.context().isPresent()) {
obj.context().get().onSuccess(obj);
@@ -1013,6 +1036,16 @@
}
}
+ private void updatePendingGroups(GroupKey gkey, GroupChainElem gce) {
+ Set<GroupChainElem> gceSet = Collections.newSetFromMap(
+ new ConcurrentHashMap<GroupChainElem, Boolean>());
+ gceSet.add(gce);
+ Set<GroupChainElem> retval = pendingGroups.putIfAbsent(gkey, gceSet);
+ if (retval != null) {
+ retval.add(gce);
+ }
+ }
+
/**
* Creates a simple L2 Interface Group.
*
@@ -1242,14 +1275,8 @@
}
// store l2groupkey with the groupChainElem for the outer-group that depends on it
- GroupChainElem gce = new GroupChainElem(outerGrpDesc, 1);
- Set<GroupChainElem> gceSet = Collections.newSetFromMap(
- new ConcurrentHashMap<GroupChainElem, Boolean>());
- gceSet.add(gce);
- Set<GroupChainElem> retval = pendingGroups.putIfAbsent(l2groupkey, gceSet);
- if (retval != null) {
- retval.add(gce);
- }
+ GroupChainElem gce = new GroupChainElem(outerGrpDesc, 1, false);
+ updatePendingGroups(l2groupkey, gce);
// create group description for the inner l2interfacegroup
GroupBucket l2interfaceGroupBucket =
@@ -1376,7 +1403,8 @@
l2floodgroupId,
nextObj.appId());
GroupChainElem gce = new GroupChainElem(l2floodGroupDescription,
- l2interfaceGroupDescs.size());
+ l2interfaceGroupDescs.size(),
+ false);
log.debug("Trying L2-Flood: device:{} gid:{} gkey:{} nextid:{}",
deviceId, Integer.toHexString(l2floodgroupId),
l2floodgroupkey, nextObj.id());
@@ -1392,16 +1420,8 @@
for (GroupDescription l2intGrpDesc : l2interfaceGroupDescs) {
// store all l2groupkeys with the groupChainElem for the l2floodgroup
// that depends on it
- Set<GroupChainElem> gceSet = Collections.newSetFromMap(
- new ConcurrentHashMap<GroupChainElem, Boolean>());
- gceSet.add(gce);
- Set<GroupChainElem> retval = pendingGroups.putIfAbsent(
- l2intGrpDesc.appCookie(), gceSet);
- if (retval != null) {
- retval.add(gce);
- }
-
- // create and send groups for all l2 interface groups
+ updatePendingGroups(l2intGrpDesc.appCookie(), gce);
+ // send groups for all l2 interface groups
groupService.addGroup(l2intGrpDesc);
}
}
@@ -1430,17 +1450,81 @@
* <p>
* NOTE: We do not create MPLS ECMP groups as they are unimplemented in
* OF-DPA 2.0 (even though it is in the spec). Therefore we do not
- * check the nextObjective meta.
+ * check the nextObjective meta to see what is matching before being
+ * sent to this nextObjective.
*
* @param nextObj the nextObjective of type HASHED
*/
private void processHashedNextObjective(NextObjective nextObj) {
- // break up hashed next objective to multiple groups
- Collection<TrafficTreatment> buckets = nextObj.next();
-
// storage for all group keys in the chain of groups created
List<Deque<GroupKey>> allGroupKeys = new ArrayList<>();
List<GroupInfo> unsentGroups = new ArrayList<>();
+ createHashBucketChains(nextObj, allGroupKeys, unsentGroups);
+
+ // now we can create the outermost L3 ECMP group
+ List<GroupBucket> l3ecmpGroupBuckets = new ArrayList<>();
+ for (GroupInfo gi : unsentGroups) {
+ // create ECMP bucket to point to the outer group
+ TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
+ ttb.group(new DefaultGroupId(gi.outerGrpDesc.givenGroupId()));
+ GroupBucket sbucket = DefaultGroupBucket
+ .createSelectGroupBucket(ttb.build());
+ l3ecmpGroupBuckets.add(sbucket);
+ }
+ int l3ecmpGroupId = L3ECMPMASK | nextObj.id() << 12;
+ GroupKey l3ecmpGroupKey = new DefaultGroupKey(appKryo.serialize(l3ecmpGroupId));
+ GroupDescription l3ecmpGroupDesc =
+ new DefaultGroupDescription(
+ deviceId,
+ GroupDescription.Type.SELECT,
+ new GroupBuckets(l3ecmpGroupBuckets),
+ l3ecmpGroupKey,
+ l3ecmpGroupId,
+ nextObj.appId());
+ GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc,
+ l3ecmpGroupBuckets.size(),
+ false);
+
+ // create objects for local and distributed storage
+ allGroupKeys.forEach(gkeyChain -> gkeyChain.addFirst(l3ecmpGroupKey));
+ OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(allGroupKeys, nextObj);
+
+ // store l3ecmpGroupKey with the ofdpaGroupChain for the nextObjective
+ // that depends on it
+ updatePendingNextObjective(l3ecmpGroupKey, ofdpaGrp);
+
+ log.debug("Trying L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
+ deviceId, Integer.toHexString(l3ecmpGroupId),
+ l3ecmpGroupKey, nextObj.id());
+ // finally we are ready to send the innermost groups
+ for (GroupInfo gi : unsentGroups) {
+ log.debug("Sending innermost group {} in group chain on device {} ",
+ Integer.toHexString(gi.innerGrpDesc.givenGroupId()), deviceId);
+ updatePendingGroups(gi.outerGrpDesc.appCookie(), l3ecmpGce);
+ groupService.addGroup(gi.innerGrpDesc);
+ }
+
+ }
+
+ /**
+ * Creates group chains for all buckets in a hashed group, and stores the
+ * GroupInfos and GroupKeys for all the groups in the lists passed in, which
+ * should be empty.
+ * <p>
+ * Does not create the top level ECMP group. Does not actually send the
+ * groups to the groupService.
+ *
+ * @param nextObj the Next Objective with buckets that need to be converted
+ * to group chains
+ * @param allGroupKeys a list to store groupKey for each bucket-group-chain
+ * @param unsentGroups a list to store GroupInfo for each bucket-group-chain
+ */
+ private void createHashBucketChains(NextObjective nextObj,
+ List<Deque<GroupKey>> allGroupKeys,
+ List<GroupInfo> unsentGroups) {
+ // break up hashed next objective to multiple groups
+ Collection<TrafficTreatment> buckets = nextObj.next();
+
for (TrafficTreatment bucket : buckets) {
//figure out how many labels are pushed in each bucket
int labelsPushed = 0;
@@ -1508,15 +1592,8 @@
l3vpngroupkey,
l3vpngroupId,
nextObj.appId());
- GroupChainElem l3vpnGce = new GroupChainElem(l3vpnGroupDesc, 1);
- Set<GroupChainElem> gceSet = Collections.newSetFromMap(
- new ConcurrentHashMap<GroupChainElem, Boolean>());
- gceSet.add(l3vpnGce);
- Set<GroupChainElem> retval = pendingGroups
- .putIfAbsent(onelabelGroupInfo.outerGrpDesc.appCookie(), gceSet);
- if (retval != null) {
- retval.add(l3vpnGce);
- }
+ GroupChainElem l3vpnGce = new GroupChainElem(l3vpnGroupDesc, 1, false);
+ updatePendingGroups(onelabelGroupInfo.outerGrpDesc.appCookie(), l3vpnGce);
gkeyChain.addFirst(onelabelGroupInfo.innerGrpDesc.appCookie());
gkeyChain.addFirst(onelabelGroupInfo.outerGrpDesc.appCookie());
@@ -1535,80 +1612,186 @@
} else {
log.warn("Driver currently does not handle more than 1 MPLS "
- + "labels. Not processing nextObjective {}", nextObj);
+ + "labels. Not processing nextObjective {}", nextObj.id());
return;
}
// all groups in this chain
allGroupKeys.add(gkeyChain);
}
+ }
- // now we can create the outermost L3 ECMP group
- List<GroupBucket> l3ecmpGroupBuckets = new ArrayList<>();
- for (GroupInfo gi : unsentGroups) {
- // create ECMP bucket to point to the outer group
- TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
- ttb.group(new DefaultGroupId(gi.outerGrpDesc.givenGroupId()));
- GroupBucket sbucket = DefaultGroupBucket
- .createSelectGroupBucket(ttb.build());
- l3ecmpGroupBuckets.add(sbucket);
+ /**
+ * Adds a bucket to the top level group of a group-chain, and creates the chain.
+ *
+ * @param nextObjective the next group to add a bucket to
+ * @param next the representation of the existing group-chain for this next objective
+ */
+ private void addBucketToGroup(NextObjective nextObjective, NextGroup next) {
+ if (nextObjective.type() != NextObjective.Type.HASHED) {
+ log.warn("AddBuckets not applied to nextType:{} in dev:{} for next:{}",
+ nextObjective.type(), deviceId, nextObjective.id());
+ return;
}
- int l3ecmpGroupId = L3ECMPMASK | nextObj.id() << 12;
+ if (nextObjective.next().size() > 1) {
+ log.warn("Only one bucket can be added at a time");
+ return;
+ }
+ // storage for all group keys in the chain of groups created
+ List<Deque<GroupKey>> allGroupKeys = new ArrayList<>();
+ List<GroupInfo> unsentGroups = new ArrayList<>();
+ createHashBucketChains(nextObjective, allGroupKeys, unsentGroups);
+
+ // now we can create the outermost L3 ECMP group bucket to add
+ GroupInfo gi = unsentGroups.get(0); // only one bucket, so only one group-chain
+ TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
+ ttb.group(new DefaultGroupId(gi.outerGrpDesc.givenGroupId()));
+ GroupBucket sbucket = DefaultGroupBucket.createSelectGroupBucket(ttb.build());
+
+ // recreate the original L3 ECMP group id and description
+ int l3ecmpGroupId = L3ECMPMASK | nextObjective.id() << 12;
GroupKey l3ecmpGroupKey = new DefaultGroupKey(appKryo.serialize(l3ecmpGroupId));
+
+ // Although GroupDescriptions are not necessary for adding buckets to
+ // existing groups, we use one in the GroupChainElem. When the latter is
+ // processed, the info will be extracted for the bucketAdd call to groupService
GroupDescription l3ecmpGroupDesc =
new DefaultGroupDescription(
deviceId,
GroupDescription.Type.SELECT,
- new GroupBuckets(l3ecmpGroupBuckets),
+ new GroupBuckets(Collections.singletonList(sbucket)),
l3ecmpGroupKey,
l3ecmpGroupId,
- nextObj.appId());
- GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc,
- l3ecmpGroupBuckets.size());
+ nextObjective.appId());
+ GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc, 1, true);
- // create objects for local and distributed storage
- allGroupKeys.forEach(gkeyChain -> gkeyChain.addFirst(l3ecmpGroupKey));
- OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(allGroupKeys, nextObj);
+ // update original NextGroup with new bucket-chain
+ // don't need to update pendingNextObjectives -- group already exists
+ Deque<GroupKey> newBucketChain = allGroupKeys.get(0);
+ newBucketChain.addFirst(l3ecmpGroupKey);
+ List<Deque<GroupKey>> allOriginalKeys = appKryo.deserialize(next.data());
+ allOriginalKeys.add(newBucketChain);
+ flowObjectiveStore.putNextGroup(nextObjective.id(),
+ new OfdpaNextGroup(allOriginalKeys, nextObjective));
- // store l3ecmpGroupKey with the ofdpaGroupChain for the nextObjective
- // that depends on it
- updatePendingNextObjective(l3ecmpGroupKey, ofdpaGrp);
-
- log.debug("Trying L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
+ log.debug("Adding to L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
deviceId, Integer.toHexString(l3ecmpGroupId),
- l3ecmpGroupKey, nextObj.id());
- // finally we are ready to send the innermost groups
- for (GroupInfo gi : unsentGroups) {
- log.debug("Sending innermost group {} in group chain on device {} ",
- Integer.toHexString(gi.innerGrpDesc.givenGroupId()), deviceId);
- Set<GroupChainElem> gceSet = Collections.newSetFromMap(
- new ConcurrentHashMap<GroupChainElem, Boolean>());
- gceSet.add(l3ecmpGce);
- Set<GroupChainElem> retval = pendingGroups
- .putIfAbsent(gi.outerGrpDesc.appCookie(), gceSet);
- if (retval != null) {
- retval.add(l3ecmpGce);
- }
+ l3ecmpGroupKey, nextObjective.id());
+ // send the innermost group
+ log.debug("Sending innermost group {} in group chain on device {} ",
+ Integer.toHexString(gi.innerGrpDesc.givenGroupId()), deviceId);
+ updatePendingGroups(gi.outerGrpDesc.appCookie(), l3ecmpGce);
+ groupService.addGroup(gi.innerGrpDesc);
- groupService.addGroup(gi.innerGrpDesc);
+ }
+
+ /**
+ * Removes the bucket in the top level group of a possible group-chain. Does
+ * not remove the groups in a group-chain pointed to by this bucket, as they
+ * may be in use (referenced by other groups) elsewhere.
+ *
+ * @param nextObjective the next group to remove a bucket from
+ * @param next the representation of the existing group-chain for this next objective
+ */
+ private void removeBucketFromGroup(NextObjective nextObjective, NextGroup next) {
+ if (nextObjective.type() != NextObjective.Type.HASHED) {
+ log.warn("RemoveBuckets not applied to nextType:{} in dev:{} for next:{}",
+ nextObjective.type(), deviceId, nextObjective.id());
+ return;
+ }
+ Collection<TrafficTreatment> treatments = nextObjective.next();
+ TrafficTreatment treatment = treatments.iterator().next();
+ // find the bucket to remove by noting the outport, and figuring out the
+ // top-level group in the group-chain that indirectly references the port
+ PortNumber outport = null;
+ for (Instruction ins : treatment.allInstructions()) {
+ if (ins instanceof OutputInstruction) {
+ outport = ((OutputInstruction) ins).port();
+ break;
+ }
+ }
+ if (outport == null) {
+ log.error("next objective {} has no outport", nextObjective.id());
+ return;
}
+ List<Deque<GroupKey>> allgkeys = appKryo.deserialize(next.data());
+ Deque<GroupKey> foundChain = null;
+ int index = 0;
+ for (Deque<GroupKey> gkeys : allgkeys) {
+ GroupKey groupWithPort = gkeys.peekLast();
+ Group group = groupService.getGroup(deviceId, groupWithPort);
+ if (group == null) {
+ log.warn("Inconsistent group chain");
+ continue;
+ }
+ // last group in group chain should have a single bucket pointing to port
+ List<Instruction> lastIns = group.buckets().buckets().iterator()
+ .next().treatment().allInstructions();
+ for (Instruction i : lastIns) {
+ if (i instanceof OutputInstruction) {
+ PortNumber lastport = ((OutputInstruction) i).port();
+ if (lastport.equals(outport)) {
+ foundChain = gkeys;
+ break;
+ }
+ }
+ }
+ if (foundChain != null) {
+ break;
+ }
+ index++;
+ }
+ if (foundChain != null) {
+ //first groupkey is the one we want to modify
+ GroupKey modGroupKey = foundChain.peekFirst();
+ Group modGroup = groupService.getGroup(deviceId, modGroupKey);
+ //second groupkey is the one we wish to remove the reference to
+ GroupKey pointedGroupKey = null;
+ int i = 0;
+ for (GroupKey gk : foundChain) {
+ if (i++ == 1) {
+ pointedGroupKey = gk;
+ break;
+ }
+ }
+ Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
+ GroupBucket bucket = DefaultGroupBucket.createSelectGroupBucket(
+ DefaultTrafficTreatment.builder()
+ .group(pointedGroup.id())
+ .build());
+ GroupBuckets removeBuckets = new GroupBuckets(Collections
+ .singletonList(bucket));
+ log.debug("Removing buckets from group id {} for next id {} in device {}",
+ modGroup.id(), nextObjective.id(), deviceId);
+ groupService.removeBucketsFromGroup(deviceId, modGroupKey,
+ removeBuckets, modGroupKey,
+ nextObjective.appId());
+ //update store
+ allgkeys.remove(index);
+ flowObjectiveStore.putNextGroup(nextObjective.id(),
+ new OfdpaNextGroup(allgkeys, nextObjective));
+ } else {
+ log.warn("Could not find appropriate group-chain for removing bucket"
+ + " for next id {} in dev:{}", nextObjective.id(), deviceId);
+ }
}
- private void addBucketToGroup(NextObjective nextObjective) {
- // TODO Auto-generated method stub
- }
-
- private void waitToAddBucketToGroup(NextObjective nextObjective) {
- // TODO Auto-generated method stub
- }
-
- private void removeBucketFromGroup(NextObjective nextObjective) {
- // TODO Auto-generated method stub
- }
-
- private void removeGroup(NextObjective nextObjective) {
- // TODO Auto-generated method stub
+ /**
+ * Removes all groups in multiple possible group-chains that represent the next
+ * objective.
+ *
+ * @param nextObjective the next objective to remove
+ * @param next the NextGroup that represents the existing group-chain for
+ * this next objective
+ */
+ private void removeGroup(NextObjective nextObjective, NextGroup next) {
+ List<Deque<GroupKey>> allgkeys = appKryo.deserialize(next.data());
+ allgkeys.forEach(groupChain -> {
+ groupChain.forEach(groupKey ->
+ groupService.removeGroup(deviceId, groupKey, nextObjective.appId()));
+ });
+ flowObjectiveStore.removeNextGroup(nextObjective.id());
}
/**
@@ -1617,7 +1800,7 @@
* and this driver has received notification for it. A second assumption is
* that if there is another group waiting for this group then the appropriate
* stores already have the information to act upon the notification for the
- * creating of this group.
+ * creation of this group.
* <p>
* The processing of the GroupChainElement depends on the number of groups
* this element is waiting on. For all group types other than SIMPLE, a
@@ -1632,7 +1815,15 @@
return;
}
log.debug("GCE: {} ready to be processed", gce);
- groupService.addGroup(gce.groupDescription);
+ if (gce.addBucketToGroup) {
+ groupService.addBucketsToGroup(gce.groupDescription.deviceId(),
+ gce.groupDescription.appCookie(),
+ gce.groupDescription.buckets(),
+ gce.groupDescription.appCookie(),
+ gce.groupDescription.appId());
+ } else {
+ groupService.addGroup(gce.groupDescription);
+ }
}
private class GroupChecker implements Runnable {
@@ -1646,33 +1837,45 @@
.collect(Collectors.toSet());
keys.addAll(otherkeys);
- keys.stream().forEach(key -> {
- //first check for group chain
- Set<GroupChainElem> gceSet = pendingGroups.remove(key);
- if (gceSet != null) {
- for (GroupChainElem gce : gceSet) {
- log.info("Group service processed group key {} in device {}. "
- + "Processing next group in group chain with group id {}",
- key, deviceId,
- Integer.toHexString(gce.groupDescription.givenGroupId()));
- processGroupChain(gce);
+ keys.stream().forEach(key ->
+ processPendingGroupsOrNextObjectives(key, false));
+ }
+ }
+
+ private void processPendingGroupsOrNextObjectives(GroupKey key, boolean added) {
+ //first check for group chain
+ Set<GroupChainElem> gceSet = pendingGroups.remove(key);
+ if (gceSet != null) {
+ for (GroupChainElem gce : gceSet) {
+ log.info("Group service {} group key {} in device {}. "
+ + "Processing next group in group chain with group id {}",
+ (added) ? "ADDED" : "processed",
+ key, deviceId,
+ Integer.toHexString(gce.groupDescription.givenGroupId()));
+ processGroupChain(gce);
+ }
+ } else {
+ // otherwise chain complete - check for waiting nextObjectives
+ List<OfdpaNextGroup> nextGrpList = pendingNextObjectives.getIfPresent(key);
+ if (nextGrpList != null) {
+ pendingNextObjectives.invalidate(key);
+ nextGrpList.forEach(nextGrp -> {
+ log.info("Group service {} group key {} in device:{}. "
+ + "Done implementing next objective: {} <<-->> gid:{}",
+ (added) ? "ADDED" : "processed",
+ key, deviceId, nextGrp.nextObjective().id(),
+ Integer.toHexString(groupService.getGroup(deviceId, key)
+ .givenGroupId()));
+ pass(nextGrp.nextObjective());
+ flowObjectiveStore.putNextGroup(nextGrp.nextObjective().id(), nextGrp);
+ // check if addBuckets waiting for this completion
+ NextObjective pendBkt = pendingBuckets
+ .remove(nextGrp.nextObjective().id());
+ if (pendBkt != null) {
+ addBucketToGroup(pendBkt, nextGrp);
}
- } else {
- List<OfdpaNextGroup> objList = pendingNextObjectives.getIfPresent(key);
- if (objList != null) {
- pendingNextObjectives.invalidate(key);
- objList.forEach(obj -> {
- log.info("Group service processed group key {} in device:{}. "
- + "Done implementing next objective: {} <<-->> gid:{}",
- key, deviceId, obj.nextObjective().id(),
- Integer.toHexString(groupService.getGroup(deviceId, key)
- .givenGroupId()));
- pass(obj.nextObjective());
- flowObjectiveStore.putNextGroup(obj.nextObjective().id(), obj);
- });
- }
- }
- });
+ });
+ }
}
}
@@ -1682,31 +1885,7 @@
log.trace("received group event of type {}", event.type());
if (event.type() == GroupEvent.Type.GROUP_ADDED) {
GroupKey key = event.subject().appCookie();
- // first check for group chain
- Set<GroupChainElem> gceSet = pendingGroups.remove(key);
- if (gceSet != null) {
- for (GroupChainElem gce : gceSet) {
- log.info("group ADDED with group key {} .. "
- + "Processing next group in group chain with group key {}",
- key,
- gce.groupDescription.appCookie());
- processGroupChain(gce);
- }
- } else {
- List<OfdpaNextGroup> objList = pendingNextObjectives.getIfPresent(key);
- if (objList != null) {
- pendingNextObjectives.invalidate(key);
- objList.forEach(obj -> {
- log.info("group ADDED with key {} in dev {}.. Done implementing next "
- + "objective: {} <<-->> gid:{}",
- key, deviceId, obj.nextObjective().id(),
- Integer.toHexString(groupService.getGroup(deviceId, key)
- .givenGroupId()));
- pass(obj.nextObjective());
- flowObjectiveStore.putNextGroup(obj.nextObjective().id(), obj);
- });
- }
- }
+ processPendingGroupsOrNextObjectives(key, true);
}
}
}
@@ -1714,7 +1893,7 @@
/**
* Represents an entire group-chain that implements a Next-Objective from
* the application. The objective is represented as a list of deques, where
- * each deque can is a separate chain of groups.
+ * each deque is a separate chain of groups.
* <p>
* For example, an ECMP group with 3 buckets, where each bucket points to
* a group chain of L3 Unicast and L2 interface groups will look like this:
@@ -1765,10 +1944,13 @@
private class GroupChainElem {
private GroupDescription groupDescription;
private AtomicInteger waitOnGroups;
+ private boolean addBucketToGroup;
- GroupChainElem(GroupDescription groupDescription, int waitOnGroups) {
+ GroupChainElem(GroupDescription groupDescription, int waitOnGroups,
+ boolean addBucketToGroup) {
this.groupDescription = groupDescription;
this.waitOnGroups = new AtomicInteger(waitOnGroups);
+ this.addBucketToGroup = addBucketToGroup;
}
/**
@@ -1788,6 +1970,7 @@
return (Integer.toHexString(groupDescription.givenGroupId()) +
" groupKey: " + groupDescription.appCookie() +
" waiting-on-groups: " + waitOnGroups.get() +
+ " addBucketToGroup: " + addBucketToGroup +
" device: " + deviceId);
}
}