[ONOS-4612]Update SFC flows inline with the Official OVS NSH patch
Change-Id: If58517841096a939860d88aa78eca7cae46b9935
diff --git a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
index 0d7702c..3292e23 100644
--- a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
+++ b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
@@ -20,8 +20,9 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Optional;
+import java.util.ListIterator;
import java.util.Set;
+import java.util.UUID;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -50,10 +51,7 @@
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
-import org.onosproject.sfc.forwarder.ServiceFunctionForwarderService;
-import org.onosproject.sfc.forwarder.impl.ServiceFunctionForwarderImpl;
-import org.onosproject.sfc.installer.FlowClassifierInstallerService;
-import org.onosproject.sfc.installer.impl.FlowClassifierInstallerImpl;
+import org.onosproject.sfc.installer.impl.SfcFlowRuleInstallerImpl;
import org.onosproject.sfc.manager.SfcService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DistributedSet;
@@ -77,7 +75,6 @@
import org.onosproject.vtnrsc.VirtualPort;
import org.onosproject.vtnrsc.VirtualPortId;
import org.onosproject.vtnrsc.event.VtnRscEvent;
-import org.onosproject.vtnrsc.event.VtnRscEventFeedback;
import org.onosproject.vtnrsc.event.VtnRscListener;
import org.onosproject.vtnrsc.flowclassifier.FlowClassifierService;
import org.onosproject.vtnrsc.portchain.PortChainService;
@@ -100,7 +97,6 @@
private String nshSpiIdTopic = "nsh-spi-id";
private static final String APP_ID = "org.onosproject.app.vtn";
private static final int SFC_PRIORITY = 1000;
- private static final int NULL_PORT = 0;
private static final int MAX_NSH_SPI_ID = 0x7FFFF;
private static final int MAX_LOAD_BALANCE_ID = 0x20;
@@ -131,10 +127,9 @@
protected SfcPacketProcessor processor = new SfcPacketProcessor();
protected ApplicationId appId;
- protected ServiceFunctionForwarderService serviceFunctionForwarder;
- protected FlowClassifierInstallerService flowClassifierInstaller;
protected IdGenerator nshSpiIdGenerator;
protected EventuallyConsistentMap<PortChainId, Integer> nshSpiPortChainMap;
+ protected EventuallyConsistentMap<PortChainId, List<FiveTuple>> portChainFiveTupleMap;
protected DistributedSet<Integer> nshSpiIdFreeList;
private final VtnRscListener vtnRscListener = new InnerVtnRscListener();
@@ -142,24 +137,22 @@
@Activate
public void activate() {
appId = coreService.registerApplication(APP_ID);
- serviceFunctionForwarder = new ServiceFunctionForwarderImpl(appId);
- flowClassifierInstaller = new FlowClassifierInstallerImpl(appId);
nshSpiIdGenerator = coreService.getIdGenerator(nshSpiIdTopic);
vtnRscService.addListener(vtnRscListener);
- KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
- .register(TenantId.class)
- .register(PortPairId.class)
- .register(PortPairGroupId.class)
- .register(FlowClassifierId.class)
- .register(PortChainId.class);
+ KryoNamespace.Builder serializer = KryoNamespace
+ .newBuilder()
+ .register(PortChainId.class, UUID.class, FiveTuple.class, IpAddress.class, PortNumber.class,
+ DefaultFiveTuple.class, IpAddress.Version.class, TenantId.class);
nshSpiPortChainMap = storageService.<PortChainId, Integer>eventuallyConsistentMapBuilder()
- .withName("nshSpiPortChainMap")
- .withSerializer(serializer)
- .withTimestampProvider((k, v) -> new WallClockTimestamp())
- .build();
+ .withName("nshSpiPortChainMap").withSerializer(serializer)
+ .withTimestampProvider((k, v) ->new WallClockTimestamp()).build();
+
+ portChainFiveTupleMap = storageService.<PortChainId, List<FiveTuple>>eventuallyConsistentMapBuilder()
+ .withName("portChainFiveTupleMap").withSerializer(serializer)
+ .withTimestampProvider((k, v) ->new WallClockTimestamp()).build();
nshSpiIdFreeList = storageService.<Integer>setBuilder()
.withName("nshSpiIdDeletedList")
@@ -186,43 +179,47 @@
public void event(VtnRscEvent event) {
if (VtnRscEvent.Type.PORT_PAIR_PUT == event.type()) {
- PortPair portPair = ((VtnRscEventFeedback) event.subject()).portPair();
+ PortPair portPair = event.subject().portPair();
onPortPairCreated(portPair);
} else if (VtnRscEvent.Type.PORT_PAIR_DELETE == event.type()) {
- PortPair portPair = ((VtnRscEventFeedback) event.subject()).portPair();
+ PortPair portPair = event.subject().portPair();
onPortPairDeleted(portPair);
} else if (VtnRscEvent.Type.PORT_PAIR_UPDATE == event.type()) {
- PortPair portPair = ((VtnRscEventFeedback) event.subject()).portPair();
+ PortPair portPair = event.subject().portPair();
onPortPairDeleted(portPair);
onPortPairCreated(portPair);
} else if (VtnRscEvent.Type.PORT_PAIR_GROUP_PUT == event.type()) {
- PortPairGroup portPairGroup = ((VtnRscEventFeedback) event.subject()).portPairGroup();
+ PortPairGroup portPairGroup = event.subject().portPairGroup();
onPortPairGroupCreated(portPairGroup);
} else if (VtnRscEvent.Type.PORT_PAIR_GROUP_DELETE == event.type()) {
- PortPairGroup portPairGroup = ((VtnRscEventFeedback) event.subject()).portPairGroup();
+ PortPairGroup portPairGroup = event.subject().portPairGroup();
onPortPairGroupDeleted(portPairGroup);
} else if (VtnRscEvent.Type.PORT_PAIR_GROUP_UPDATE == event.type()) {
- PortPairGroup portPairGroup = ((VtnRscEventFeedback) event.subject()).portPairGroup();
+ PortPairGroup portPairGroup = event.subject().portPairGroup();
onPortPairGroupDeleted(portPairGroup);
onPortPairGroupCreated(portPairGroup);
} else if (VtnRscEvent.Type.FLOW_CLASSIFIER_PUT == event.type()) {
- FlowClassifier flowClassifier = ((VtnRscEventFeedback) event.subject()).flowClassifier();
+ FlowClassifier flowClassifier = event.subject().flowClassifier();
onFlowClassifierCreated(flowClassifier);
} else if (VtnRscEvent.Type.FLOW_CLASSIFIER_DELETE == event.type()) {
- FlowClassifier flowClassifier = ((VtnRscEventFeedback) event.subject()).flowClassifier();
+ FlowClassifier flowClassifier = event.subject().flowClassifier();
onFlowClassifierDeleted(flowClassifier);
} else if (VtnRscEvent.Type.FLOW_CLASSIFIER_UPDATE == event.type()) {
- FlowClassifier flowClassifier = ((VtnRscEventFeedback) event.subject()).flowClassifier();
+ FlowClassifier flowClassifier = event.subject().flowClassifier();
onFlowClassifierDeleted(flowClassifier);
onFlowClassifierCreated(flowClassifier);
} else if (VtnRscEvent.Type.PORT_CHAIN_PUT == event.type()) {
- PortChain portChain = (PortChain) ((VtnRscEventFeedback) event.subject()).portChain();
+ PortChain portChain = event.subject().portChain();
+ if (portChain.oldPortChain() != null) {
+ onPortChainDeleted(portChain.oldPortChain());
+ }
onPortChainCreated(portChain);
} else if (VtnRscEvent.Type.PORT_CHAIN_DELETE == event.type()) {
- PortChain portChain = (PortChain) ((VtnRscEventFeedback) event.subject()).portChain();
+ PortChain portChain = event.subject().portChain();
onPortChainDeleted(portChain);
+ portChainFiveTupleMap.remove(portChain.portChainId());
} else if (VtnRscEvent.Type.PORT_CHAIN_UPDATE == event.type()) {
- PortChain portChain = (PortChain) ((VtnRscEventFeedback) event.subject()).portChain();
+ PortChain portChain = event.subject().portChain();
onPortChainDeleted(portChain);
onPortChainCreated(portChain);
}
@@ -232,58 +229,69 @@
@Override
public void onPortPairCreated(PortPair portPair) {
log.debug("onPortPairCreated");
- // TODO: Modify forwarding rule on port-pair creation.
+ // Do nothing
}
@Override
public void onPortPairDeleted(PortPair portPair) {
log.debug("onPortPairDeleted");
- // TODO: Modify forwarding rule on port-pair deletion.
+ // Do nothing
}
@Override
public void onPortPairGroupCreated(PortPairGroup portPairGroup) {
log.debug("onPortPairGroupCreated");
- // TODO: Modify forwarding rule on port-pair-group creation.
+ // Do nothing
}
@Override
public void onPortPairGroupDeleted(PortPairGroup portPairGroup) {
log.debug("onPortPairGroupDeleted");
- // TODO: Modify forwarding rule on port-pair-group deletion.
+ // Do nothing
}
@Override
public void onFlowClassifierCreated(FlowClassifier flowClassifier) {
log.debug("onFlowClassifierCreated");
- // TODO: Modify forwarding rule on flow-classifier creation.
+ // Do nothing
}
@Override
public void onFlowClassifierDeleted(FlowClassifier flowClassifier) {
log.debug("onFlowClassifierDeleted");
- // TODO: Modify forwarding rule on flow-classifier deletion.
+ // Do nothing
}
@Override
public void onPortChainCreated(PortChain portChain) {
NshServicePathId nshSpi;
- log.info("onPortChainCreated");
- if (nshSpiPortChainMap.containsKey(portChain.portChainId())) {
- nshSpi = NshServicePathId.of(nshSpiPortChainMap.get(portChain.portChainId()));
- } else {
- int id = getNextNshSpi();
- if (id > MAX_NSH_SPI_ID) {
- log.error("Reached max limit of service path index."
- + "Failed to install SFC for port chain {}", portChain.portChainId().toString());
- return;
- }
- nshSpi = NshServicePathId.of(id);
- nshSpiPortChainMap.put(portChain.portChainId(), new Integer(id));
- }
+ log.info("On port chain created");
+ int spi = getNextNshSpi();
+ if (spi > MAX_NSH_SPI_ID) {
+ log.error("Reached max limit of service path index." + "Failed to install SFC for port chain {}",
+ portChain.portChainId().toString());
+ return;
+ }
+ nshSpi = NshServicePathId.of(spi);
+ nshSpiPortChainMap.put(portChain.portChainId(), new Integer(spi));
+ if (!portChainFiveTupleMap.containsKey(portChain.portChainId())) {
+ portChainFiveTupleMap.put(portChain.portChainId(), Lists.newArrayList());
+ }
// Install classifier rule to send the packet to controller
- flowClassifierInstaller.installFlowClassifier(portChain, nshSpi);
+ SfcFlowRuleInstallerImpl flowRuleInstaller = new SfcFlowRuleInstallerImpl(appId);
+ flowRuleInstaller.installFlowClassifier(portChain, nshSpi);
+
+ // Install rules for already identified five tuples.
+ List<FiveTuple> list = portChainFiveTupleMap.get(portChain.portChainId());
+ for (FiveTuple fiveTuple : list) {
+ LoadBalanceId id = loadBalanceSfc(portChain.portChainId(), fiveTuple);
+ // Get nsh service path index
+ nshSpi = NshServicePathId.of(getNshServicePathId(id, spi));
+ // download the required flow rules for classifier and
+ // forwarding
+ flowRuleInstaller.installLoadBalancedFlowRules(portChain, fiveTuple, nshSpi);
+ }
}
@Override
@@ -295,7 +303,8 @@
int nshSpiId = nshSpiPortChainMap.get(portChain.portChainId());
// Uninstall classifier rules
- flowClassifierInstaller.unInstallFlowClassifier(portChain, NshServicePathId.of(nshSpiId));
+ SfcFlowRuleInstallerImpl flowRuleInstaller = new SfcFlowRuleInstallerImpl(appId);
+ flowRuleInstaller.unInstallFlowClassifier(portChain, NshServicePathId.of(nshSpiId));
// remove from nshSpiPortChainMap and add to nshSpiIdFreeList
nshSpiPortChainMap.remove(portChain.portChainId());
nshSpiIdFreeList.add(nshSpiId);
@@ -314,9 +323,16 @@
processedIdList.add(id);
}
nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
- flowClassifierInstaller.unInstallLoadBalancedFlowClassifier(portChain, fiveTuple, nshSpi);
- serviceFunctionForwarder.unInstallLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
- nshSpi);
+ flowRuleInstaller.unInstallLoadBalancedFlowRules(portChain, fiveTuple, nshSpi);
+ }
+
+ // Reset load for all the port pairs
+ List<PortPairGroupId> ppgIdlist = portChain.portPairGroups();
+ ListIterator<PortPairGroupId> ppgIdListIterator = ppgIdlist.listIterator();
+ while (ppgIdListIterator.hasNext()) {
+ PortPairGroupId portPairGroupId = ppgIdListIterator.next();
+ PortPairGroup ppg = portPairGroupService.getPortPairGroup(portPairGroupId);
+ ppg.resetLoad();
}
}
@@ -388,9 +404,12 @@
boolean match = false;
// Check whether protocol is set in flow classifier
if (flowClassifier.protocol() != null) {
- if ((flowClassifier.protocol().equals("TCP") && fiveTuple.protocol() == IPv4.PROTOCOL_TCP) ||
- (flowClassifier.protocol().equals("UDP") &&
- fiveTuple.protocol() == IPv4.PROTOCOL_UDP)) {
+ if ((flowClassifier.protocol().equalsIgnoreCase("TCP")
+ && fiveTuple.protocol() == IPv4.PROTOCOL_TCP)
+ || (flowClassifier.protocol().equalsIgnoreCase("UDP")
+ && fiveTuple.protocol() == IPv4.PROTOCOL_UDP)
+ || (flowClassifier.protocol().equalsIgnoreCase("ICMP")
+ && fiveTuple.protocol() == IPv4.PROTOCOL_ICMP)) {
match = true;
} else {
continue;
@@ -459,68 +478,6 @@
}
/**
- * Find the load balanced path set it to port chain for the given five tuple.
- *
- * @param portChainId port chain id
- * @param fiveTuple five tuple info
- * @return load balance id
- */
- private LoadBalanceId loadBalanceSfc(PortChainId portChainId, FiveTuple fiveTuple) {
-
- // Get the port chain
- PortChain portChain = portChainService.getPortChain(portChainId);
- List<PortPairId> loadBalancePath = Lists.newArrayList();
- LoadBalanceId id;
- int paths = portChain.getLoadBalancePathSize();
- if (paths >= MAX_LOAD_BALANCE_ID) {
- log.info("Max limit reached for load balance paths. "
- + "Reusing the created path for port chain {} with five tuple {}",
- portChainId, fiveTuple);
- id = LoadBalanceId.of((byte) ((paths + 1) % MAX_LOAD_BALANCE_ID));
- portChain.addLoadBalancePath(fiveTuple, id, portChain.getLoadBalancePath(id));
- }
-
- // Get the list of port pair groups from port chain
- Iterable<PortPairGroupId> portPairGroups = portChain.portPairGroups();
- for (final PortPairGroupId portPairGroupId : portPairGroups) {
- PortPairGroup portPairGroup = portPairGroupService.getPortPairGroup(portPairGroupId);
-
- // Get the list of port pair ids from port pair group.
- Iterable<PortPairId> portPairs = portPairGroup.portPairs();
- int minLoad = 0xFFF;
- PortPairId minLoadPortPairId = null;
- for (final PortPairId portPairId : portPairs) {
- int load = portPairGroup.getLoad(portPairId);
- if (load == 0) {
- minLoadPortPairId = portPairId;
- break;
- } else {
- // Check the port pair which has min load.
- if (load < minLoad) {
- minLoad = load;
- minLoadPortPairId = portPairId;
- }
- }
- }
- if (minLoadPortPairId != null) {
- loadBalancePath.add(minLoadPortPairId);
- portPairGroup.addLoad(minLoadPortPairId);
- }
- }
-
- // Check if the path already exists, if not create a new id
- Optional<LoadBalanceId> output = portChain.matchPath(loadBalancePath);
- if (output.isPresent()) {
- id = output.get();
- } else {
- id = LoadBalanceId.of((byte) (paths + 1));
- }
-
- portChain.addLoadBalancePath(fiveTuple, id, loadBalancePath);
- return id;
- }
-
- /**
* Get the tenant id for the given mac address.
*
* @param mac mac address
@@ -561,7 +518,7 @@
TCP tcpPacket = (TCP) ipv4Packet.getPayload();
portSrc = tcpPacket.getSourcePort();
portDst = tcpPacket.getDestinationPort();
- } else if (protocol == IPv4.PROTOCOL_UDP) {
+ } else if (protocol == IPv4.PROTOCOL_UDP) {
UDP udpPacket = (UDP) ipv4Packet.getPayload();
portSrc = udpPacket.getSourcePort();
portDst = udpPacket.getDestinationPort();
@@ -571,7 +528,7 @@
// No need to process other packets received by controller.
return;
}
- } else if (ethType == Ethernet.TYPE_IPV6) {
+ } else {
return;
}
@@ -587,11 +544,11 @@
PortChainId portChainId = findPortChainFromFiveTuple(fiveTuple);
if (portChainId == null) {
- log.error("Packet does not match with any classifier");
return;
}
// Once the 5 tuple and port chain are identified, give this input for load balancing
+ addToPortChainIdFiveTupleMap(portChainId, fiveTuple);
LoadBalanceId id = loadBalanceSfc(portChainId, fiveTuple);
// Get nsh service path index
NshServicePathId nshSpi;
@@ -611,11 +568,9 @@
}
// download the required flow rules for classifier and forwarding
// install in OVS.
- ConnectPoint connectPoint = flowClassifierInstaller.installLoadBalancedFlowClassifier(portChain,
- fiveTuple, nshSpi);
- serviceFunctionForwarder.installLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
- nshSpi);
- sendPacket(context, connectPoint);
+ SfcFlowRuleInstallerImpl flowRuleInstaller = new SfcFlowRuleInstallerImpl(appId);
+ flowRuleInstaller.installLoadBalancedFlowRules(portChain, fiveTuple, nshSpi);
+ sendPacket(context);
}
/**
@@ -624,11 +579,13 @@
* @param context packet context
* @param connectPoint connect point of first service function
*/
- private void sendPacket(PacketContext context, ConnectPoint connectPoint) {
+ private void sendPacket(PacketContext context) {
- TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(connectPoint.port()).build();
- OutboundPacket packet = new DefaultOutboundPacket(connectPoint.deviceId(), treatment,
- context.inPacket().unparsed());
+ ConnectPoint sourcePoint = context.inPacket().receivedFrom();
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(sourcePoint.port()).build();
+ OutboundPacket packet = new DefaultOutboundPacket(sourcePoint.deviceId(), treatment, context.inPacket()
+ .unparsed());
packetService.emit(packet);
log.trace("Sending packet: {}", packet);
}
@@ -646,4 +603,71 @@
nshSpiNew = nshSpiNew | id.loadBalanceId();
return nshSpiNew;
}
+
+ private void addToPortChainIdFiveTupleMap(PortChainId portChainId, FiveTuple fiveTuple) {
+ List<FiveTuple> list = portChainFiveTupleMap.get(portChainId);
+ list.add(fiveTuple);
+ portChainFiveTupleMap.put(portChainId, list);
+ }
+
+ /**
+ * Find the load balanced path set it to port chain for the given five
+ * tuple.
+ *
+ * @param portChainId port chain id
+ * @param fiveTuple five tuple info
+ * @return load balance id
+ */
+ private LoadBalanceId loadBalanceSfc(PortChainId portChainId, FiveTuple fiveTuple) {
+
+ // Get the port chain
+ PortChain portChain = portChainService.getPortChain(portChainId);
+ List<PortPairId> loadBalancePath = Lists.newArrayList();
+ LoadBalanceId id;
+ int paths = portChain.getLoadBalancePathSize();
+ if (paths >= MAX_LOAD_BALANCE_ID) {
+ log.info("Max limit reached for load balance paths. "
+ + "Reusing the created path for port chain {} with five tuple {}", portChainId, fiveTuple);
+ id = LoadBalanceId.of((byte) ((paths + 1) % MAX_LOAD_BALANCE_ID));
+ portChain.addLoadBalancePath(fiveTuple, id, portChain.getLoadBalancePath(id));
+ }
+
+ // Get the list of port pair groups from port chain
+ Iterable<PortPairGroupId> portPairGroups = portChain.portPairGroups();
+ for (final PortPairGroupId portPairGroupId : portPairGroups) {
+ PortPairGroup portPairGroup = portPairGroupService.getPortPairGroup(portPairGroupId);
+
+ // Get the list of port pair ids from port pair group.
+ Iterable<PortPairId> portPairs = portPairGroup.portPairs();
+ int minLoad = 0xFFF;
+ PortPairId minLoadPortPairId = null;
+ for (final PortPairId portPairId : portPairs) {
+ int load = portPairGroup.getLoad(portPairId);
+ if (load == 0) {
+ minLoadPortPairId = portPairId;
+ break;
+ } else {
+ // Check the port pair which has min load.
+ if (load < minLoad) {
+ minLoad = load;
+ minLoadPortPairId = portPairId;
+ }
+ }
+ }
+ if (minLoadPortPairId != null) {
+ loadBalancePath.add(minLoadPortPairId);
+ portPairGroup.addLoad(minLoadPortPairId);
+ }
+ }
+
+ // Check if the path already exists, if not create a new id
+ id = portChain.matchPath(loadBalancePath);
+ if (id == null) {
+ id = LoadBalanceId.of((byte) (paths + 1));
+ }
+
+ portChain.addLoadBalancePath(fiveTuple, id, loadBalancePath);
+ return id;
+ }
+
}