[ONOS-3831,ONOS-3836] Load balance algorithm for sfc
Change-Id: I48a428587420ce6d782c128b835b5bb90e0cacfe
diff --git a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/NshSpiIdGenerators.java b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/NshSpiIdGenerators.java
deleted file mode 100644
index 1dbe8c8..0000000
--- a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/NshSpiIdGenerators.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.sfc.manager;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Unique NSH SPI Id generator for NSH header.
- */
-public final class NshSpiIdGenerators {
-
- private static final AtomicInteger NSH_SPI_ID_GEN = new AtomicInteger();
- private static final int MAX_NSH_SPI_ID = 0x7FFFFFFF;
- private static int nshSpiId;
-
- /**
- * Default constructor.
- */
- private NshSpiIdGenerators() {
- }
-
- /**
- * Get the next NSH SPI id.
- *
- * @return NSH SPI id
- */
- public static int create() {
- do {
- if (nshSpiId >= MAX_NSH_SPI_ID) {
- if (NSH_SPI_ID_GEN.get() >= MAX_NSH_SPI_ID) {
- NSH_SPI_ID_GEN.set(0);
- }
- }
- nshSpiId = NSH_SPI_ID_GEN.incrementAndGet();
- } while (nshSpiId > MAX_NSH_SPI_ID);
- return nshSpiId;
- }
-}
diff --git a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
index ae6dfad..155d1d0 100644
--- a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
+++ b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
@@ -18,9 +18,10 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -38,8 +39,14 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.net.ConnectPoint;
import org.onosproject.net.NshServicePathId;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
@@ -47,13 +54,19 @@
import org.onosproject.sfc.forwarder.impl.ServiceFunctionForwarderImpl;
import org.onosproject.sfc.installer.FlowClassifierInstallerService;
import org.onosproject.sfc.installer.impl.FlowClassifierInstallerImpl;
-import org.onosproject.sfc.manager.NshSpiIdGenerators;
import org.onosproject.sfc.manager.SfcService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
import org.onosproject.vtnrsc.DefaultFiveTuple;
import org.onosproject.vtnrsc.FiveTuple;
import org.onosproject.vtnrsc.FixedIp;
import org.onosproject.vtnrsc.FlowClassifier;
import org.onosproject.vtnrsc.FlowClassifierId;
+import org.onosproject.vtnrsc.LoadBalanceId;
import org.onosproject.vtnrsc.PortChain;
import org.onosproject.vtnrsc.PortChainId;
import org.onosproject.vtnrsc.PortPair;
@@ -68,10 +81,13 @@
import org.onosproject.vtnrsc.event.VtnRscListener;
import org.onosproject.vtnrsc.flowclassifier.FlowClassifierService;
import org.onosproject.vtnrsc.portchain.PortChainService;
+import org.onosproject.vtnrsc.portpairgroup.PortPairGroupService;
import org.onosproject.vtnrsc.service.VtnRscService;
import org.onosproject.vtnrsc.virtualport.VirtualPortService;
import org.slf4j.Logger;
+import com.google.common.collect.Lists;
+
/**
* Provides implementation of SFC Service.
*/
@@ -80,9 +96,13 @@
public class SfcManager implements SfcService {
private final Logger log = getLogger(getClass());
+
+ private String nshSpiIdTopic = "nsh-spi-id";
private static final String APP_ID = "org.onosproject.app.vtn";
private static final int SFC_PRIORITY = 1000;
private static final int NULL_PORT = 0;
+ private static final int MAX_NSH_SPI_ID = 0x7FFFF;
+ private static final int MAX_LOAD_BALANCE_ID = 0x20;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VtnRscService vtnRscService;
@@ -97,26 +117,34 @@
protected PortChainService portChainService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PortPairGroupService portPairGroupService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowClassifierService flowClassifierService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VirtualPortService virtualPortService;
- private SfcPacketProcessor processor = new SfcPacketProcessor();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ protected SfcPacketProcessor processor = new SfcPacketProcessor();
protected ApplicationId appId;
- private ServiceFunctionForwarderService serviceFunctionForwarderService;
- private FlowClassifierInstallerService flowClassifierInstallerService;
+ protected ServiceFunctionForwarderService serviceFunctionForwarder;
+ protected FlowClassifierInstallerService flowClassifierInstaller;
+ protected IdGenerator nshSpiIdGenerator;
+ protected EventuallyConsistentMap<PortChainId, Integer> nshSpiPortChainMap;
+ protected DistributedSet<Integer> nshSpiIdFreeList;
private final VtnRscListener vtnRscListener = new InnerVtnRscListener();
- private ConcurrentMap<PortChainId, NshServicePathId> nshSpiPortChainMap = new ConcurrentHashMap<>();
-
@Activate
public void activate() {
appId = coreService.registerApplication(APP_ID);
- serviceFunctionForwarderService = new ServiceFunctionForwarderImpl(appId);
- flowClassifierInstallerService = new FlowClassifierInstallerImpl(appId);
+ serviceFunctionForwarder = new ServiceFunctionForwarderImpl(appId);
+ flowClassifierInstaller = new FlowClassifierInstallerImpl(appId);
+ nshSpiIdGenerator = coreService.getIdGenerator(nshSpiIdTopic);
vtnRscService.addListener(vtnRscListener);
@@ -127,6 +155,18 @@
.register(FlowClassifierId.class)
.register(PortChainId.class);
+ nshSpiPortChainMap = storageService.<PortChainId, Integer>eventuallyConsistentMapBuilder()
+ .withName("nshSpiPortChainMap")
+ .withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .build();
+
+ nshSpiIdFreeList = storageService.<Integer>setBuilder()
+ .withName("nshSpiIdDeletedList")
+ .withSerializer(Serializer.using(KryoNamespaces.API))
+ .build()
+ .asDistributedSet();
+
packetService.addProcessor(processor, PacketProcessor.director(SFC_PRIORITY));
log.info("Started");
}
@@ -230,15 +270,20 @@
NshServicePathId nshSpi;
log.info("onPortChainCreated");
if (nshSpiPortChainMap.containsKey(portChain.portChainId())) {
- nshSpi = nshSpiPortChainMap.get(portChain.portChainId());
+ nshSpi = NshServicePathId.of(nshSpiPortChainMap.get(portChain.portChainId()));
} else {
- nshSpi = NshServicePathId.of(NshSpiIdGenerators.create());
- nshSpiPortChainMap.put(portChain.portChainId(), nshSpi);
+ int id = getNextNshSpi();
+ if (id > MAX_NSH_SPI_ID) {
+ log.error("Reached max limit of service path index."
+ + "Failed to install SFC for port chain {}", portChain.portChainId().toString());
+ return;
+ }
+ nshSpi = NshServicePathId.of(id);
+ nshSpiPortChainMap.put(portChain.portChainId(), new Integer(id));
}
- // install in OVS.
- flowClassifierInstallerService.installFlowClassifier(portChain, nshSpi);
- serviceFunctionForwarderService.installForwardingRule(portChain, nshSpi);
+ // Install classifier rule to send the packet to controller
+ flowClassifierInstaller.installFlowClassifier(portChain, nshSpi);
}
@Override
@@ -248,13 +293,47 @@
throw new ItemNotFoundException("Unable to find NSH SPI");
}
- NshServicePathId nshSpi = nshSpiPortChainMap.get(portChain.portChainId());
- // uninstall from OVS.
- flowClassifierInstallerService.unInstallFlowClassifier(portChain, nshSpi);
- serviceFunctionForwarderService.unInstallForwardingRule(portChain, nshSpi);
+ int nshSpiId = nshSpiPortChainMap.get(portChain.portChainId());
+ // Uninstall classifier rules
+ flowClassifierInstaller.unInstallFlowClassifier(portChain, NshServicePathId.of(nshSpiId));
+ // remove from nshSpiPortChainMap and add to nshSpiIdFreeList
+ nshSpiPortChainMap.remove(portChain.portChainId());
+ nshSpiIdFreeList.add(nshSpiId);
- // remove SPI. No longer it will be used.
- nshSpiPortChainMap.remove(nshSpi);
+ // Uninstall load balanced classifier and forwarding rules.
+ NshServicePathId nshSpi;
+ LoadBalanceId id;
+ List<LoadBalanceId> processedIdList = Lists.newArrayList();
+ Set<FiveTuple> fiveTupleSet = portChain.getLoadBalanceIdMapKeys();
+ for (FiveTuple fiveTuple : fiveTupleSet) {
+ id = portChain.getLoadBalanceId(fiveTuple);
+ if (processedIdList.contains(id)) {
+ // multiple five tuple can have single path.
+ continue;
+ } else {
+ processedIdList.add(id);
+ }
+ nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
+ flowClassifierInstaller.unInstallLoadBalancedFlowClassifier(portChain, fiveTuple, nshSpi);
+ serviceFunctionForwarder.unInstallLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
+ nshSpi);
+ }
+ }
+
+ /**
+ * Get next nsh service path identifier.
+ *
+ * @return value of service path identifier
+ */
+ int getNextNshSpi() {
+ // If there is any free id use it. Otherwise generate new id.
+ if (nshSpiIdFreeList.isEmpty()) {
+ return (int) nshSpiIdGenerator.getNewId();
+ }
+ Iterator<Integer> it = nshSpiIdFreeList.iterator();
+ Integer value = it.next();
+ nshSpiIdFreeList.remove(value);
+ return value;
}
private class SfcPacketProcessor implements PacketProcessor {
@@ -296,6 +375,10 @@
// Identify the port chain to which the packet belongs
for (final PortChain portChain : portChains) {
+ if (!portChain.tenantId().equals(fiveTuple.tenantId())) {
+ continue;
+ }
+
Iterable<FlowClassifierId> flowClassifiers = portChain.flowClassifiers();
// One port chain can have multiple flow classifiers.
@@ -375,6 +458,67 @@
return portChainId;
}
+ /**
+ * Find the load balanced path set it to port chain for the given five tuple.
+ *
+ * @param portChainId port chain id
+ * @param fiveTuple five tuple info
+ * @return load balance id
+ */
+ private LoadBalanceId loadBalanceSfc(PortChainId portChainId, FiveTuple fiveTuple) {
+
+ // Get the port chain
+ PortChain portChain = portChainService.getPortChain(portChainId);
+ List<PortPairId> loadBalancePath = Lists.newArrayList();
+ LoadBalanceId id;
+ int paths = portChain.getLoadBalancePathSize();
+ if (paths >= MAX_LOAD_BALANCE_ID) {
+ log.info("Max limit reached for load balance paths. "
+ + "Reusing the created path for port chain {} with five tuple {}",
+ portChainId, fiveTuple);
+ id = LoadBalanceId.of((byte) ((paths + 1) % MAX_LOAD_BALANCE_ID));
+ portChain.addLoadBalancePath(fiveTuple, id, portChain.getLoadBalancePath(id));
+ }
+
+ // Get the list of port pair groups from port chain
+ Iterable<PortPairGroupId> portPairGroups = portChain.portPairGroups();
+ for (final PortPairGroupId portPairGroupId : portPairGroups) {
+ PortPairGroup portPairGroup = portPairGroupService.getPortPairGroup(portPairGroupId);
+
+ // Get the list of port pair ids from port pair group.
+ Iterable<PortPairId> portPairs = portPairGroup.portPairs();
+ int minLoad = 0xFFF;
+ PortPairId minLoadPortPairId = null;
+ for (final PortPairId portPairId : portPairs) {
+ int load = portPairGroup.getLoad(portPairId);
+ if (load == 0) {
+ minLoadPortPairId = portPairId;
+ break;
+ } else {
+ // Check the port pair which has min load.
+ if (load < minLoad) {
+ minLoad = load;
+ minLoadPortPairId = portPairId;
+ }
+ }
+ }
+ if (minLoadPortPairId != null) {
+ loadBalancePath.add(minLoadPortPairId);
+ portPairGroup.addLoad(minLoadPortPairId);
+ }
+ }
+
+ // Check if the path already exists, if not create a new id
+ Optional<LoadBalanceId> output = portChain.matchPath(loadBalancePath);
+ if (output.isPresent()) {
+ id = output.get();
+ } else {
+ id = LoadBalanceId.of((byte) (paths + 1));
+ }
+
+ portChain.addLoadBalancePath(fiveTuple, id, loadBalancePath);
+ return id;
+ }
/**
* Get the tenant id for the given mac address.
@@ -443,9 +587,59 @@
return;
}
- // TODO
+ // Once the 5 tuple and port chain are identified, give this input for load balancing
+ LoadBalanceId id = loadBalanceSfc(portChainId, fiveTuple);
+ // Get nsh service path index
+ NshServicePathId nshSpi;
+ PortChain portChain = portChainService.getPortChain(portChainId);
+ if (nshSpiPortChainMap.containsKey(portChain.portChainId())) {
+ int nshSpiId = nshSpiPortChainMap.get(portChain.portChainId());
+ nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
+ } else {
+ int nshSpiId = getNextNshSpi();
+ if (nshSpiId > MAX_NSH_SPI_ID) {
+ log.error("Reached max limit of service path index."
+ + "Failed to install SFC for port chain {}", portChain.portChainId());
+ return;
+ }
+ nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
+ nshSpiPortChainMap.put(portChain.portChainId(), new Integer(nshSpiId));
+ }
// download the required flow rules for classifier and forwarding
- // resend the packet back to classifier
+ // install in OVS.
+ ConnectPoint connectPoint = flowClassifierInstaller.installLoadBalancedFlowClassifier(portChain,
+ fiveTuple, nshSpi);
+ serviceFunctionForwarder.installLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
+ nshSpi);
+ sendPacket(context, connectPoint);
}
+
+ /**
+ * Send packet back to classifier.
+ *
+ * @param context packet context
+ * @param connectPoint connect point of first service function
+ */
+ private void sendPacket(PacketContext context, ConnectPoint connectPoint) {
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(connectPoint.port()).build();
+ OutboundPacket packet = new DefaultOutboundPacket(connectPoint.deviceId(), treatment,
+ context.inPacket().unparsed());
+ packetService.emit(packet);
+ log.trace("Sending packet: {}", packet);
+ }
+ }
+
+ /**
+ * Encapsulate 5 bit load balance id to nsh spi.
+ *
+ * @param id load balance identifier
+ * @param nshSpiId nsh service path index
+ * @return updated service path index
+ */
+ protected int getNshServicePathId(LoadBalanceId id, int nshSpiId) {
+ int nshSpiNew = nshSpiId << 5;
+ nshSpiNew = nshSpiNew | id.loadBalanceId();
+ return nshSpiNew;
}
}
diff --git a/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/DefaultPortChain.java b/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/DefaultPortChain.java
index d2c2e2e..750824f 100644
--- a/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/DefaultPortChain.java
+++ b/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/DefaultPortChain.java
@@ -142,6 +142,14 @@
}
@Override
+ public int getLoadBalancePathSize() {
+ if (sfcLoadBalanceIdMap.isEmpty()) {
+ return 0;
+ }
+ return sfcLoadBalanceIdMap.size();
+ }
+
+ @Override
public Optional<LoadBalanceId> matchPath(List<PortPairId> path) {
LoadBalanceId id = null;
diff --git a/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/PortChain.java b/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/PortChain.java
index ba87010..e89a22e 100644
--- a/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/PortChain.java
+++ b/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/PortChain.java
@@ -113,6 +113,13 @@
List<PortPairId> getLoadBalancePath(FiveTuple fiveTuple);
/**
+ * Get the no of load balance paths created.
+ *
+ * @return size of load balanced paths
+ */
+ int getLoadBalancePathSize();
+
+ /**
* Match the given path with existing load balanced paths.
*
* @param path load balanced path
diff --git a/apps/vtn/vtnweb/src/test/java/org/onosproject/vtnweb/resources/PortChainResourceTest.java b/apps/vtn/vtnweb/src/test/java/org/onosproject/vtnweb/resources/PortChainResourceTest.java
index b172c09..b4447f1 100644
--- a/apps/vtn/vtnweb/src/test/java/org/onosproject/vtnweb/resources/PortChainResourceTest.java
+++ b/apps/vtn/vtnweb/src/test/java/org/onosproject/vtnweb/resources/PortChainResourceTest.java
@@ -142,38 +142,37 @@
@Override
public void addLoadBalancePath(FiveTuple fiveTuple, LoadBalanceId id, List<PortPairId> path) {
- // TODO Auto-generated method stub
}
@Override
public LoadBalanceId getLoadBalanceId(FiveTuple fiveTuple) {
- // TODO Auto-generated method stub
return null;
}
@Override
public Set<FiveTuple> getLoadBalanceIdMapKeys() {
- // TODO Auto-generated method stub
return null;
}
@Override
public List<PortPairId> getLoadBalancePath(LoadBalanceId id) {
- // TODO Auto-generated method stub
return null;
}
@Override
public List<PortPairId> getLoadBalancePath(FiveTuple fiveTuple) {
- // TODO Auto-generated method stub
return null;
}
@Override
public Optional<LoadBalanceId> matchPath(List<PortPairId> path) {
- // TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public int getLoadBalancePathSize() {
+ return 0;
+ }
}
/**