[ONOS-3831,ONOS-3836] Load balance algorithm for sfc

Change-Id: I48a428587420ce6d782c128b835b5bb90e0cacfe
diff --git a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/NshSpiIdGenerators.java b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/NshSpiIdGenerators.java
deleted file mode 100644
index 1dbe8c8..0000000
--- a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/NshSpiIdGenerators.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.sfc.manager;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Unique NSH SPI Id generator for NSH header.
- */
-public final class NshSpiIdGenerators {
-
-    private static final AtomicInteger NSH_SPI_ID_GEN = new AtomicInteger();
-    private static final int MAX_NSH_SPI_ID = 0x7FFFFFFF;
-    private static int nshSpiId;
-
-    /**
-     * Default constructor.
-     */
-    private NshSpiIdGenerators() {
-    }
-
-    /**
-     * Get the next NSH SPI id.
-     *
-     * @return NSH SPI id
-     */
-    public static int create() {
-        do {
-            if (nshSpiId >= MAX_NSH_SPI_ID) {
-                if (NSH_SPI_ID_GEN.get() >= MAX_NSH_SPI_ID) {
-                    NSH_SPI_ID_GEN.set(0);
-                }
-            }
-            nshSpiId = NSH_SPI_ID_GEN.incrementAndGet();
-        } while (nshSpiId > MAX_NSH_SPI_ID);
-        return nshSpiId;
-    }
-}
diff --git a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
index ae6dfad..155d1d0 100644
--- a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
+++ b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
@@ -18,9 +18,10 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -38,8 +39,14 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.NshServicePathId;
 import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
@@ -47,13 +54,19 @@
 import org.onosproject.sfc.forwarder.impl.ServiceFunctionForwarderImpl;
 import org.onosproject.sfc.installer.FlowClassifierInstallerService;
 import org.onosproject.sfc.installer.impl.FlowClassifierInstallerImpl;
-import org.onosproject.sfc.manager.NshSpiIdGenerators;
 import org.onosproject.sfc.manager.SfcService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
 import org.onosproject.vtnrsc.DefaultFiveTuple;
 import org.onosproject.vtnrsc.FiveTuple;
 import org.onosproject.vtnrsc.FixedIp;
 import org.onosproject.vtnrsc.FlowClassifier;
 import org.onosproject.vtnrsc.FlowClassifierId;
+import org.onosproject.vtnrsc.LoadBalanceId;
 import org.onosproject.vtnrsc.PortChain;
 import org.onosproject.vtnrsc.PortChainId;
 import org.onosproject.vtnrsc.PortPair;
@@ -68,10 +81,13 @@
 import org.onosproject.vtnrsc.event.VtnRscListener;
 import org.onosproject.vtnrsc.flowclassifier.FlowClassifierService;
 import org.onosproject.vtnrsc.portchain.PortChainService;
+import org.onosproject.vtnrsc.portpairgroup.PortPairGroupService;
 import org.onosproject.vtnrsc.service.VtnRscService;
 import org.onosproject.vtnrsc.virtualport.VirtualPortService;
 import org.slf4j.Logger;
 
+import com.google.common.collect.Lists;
+
 /**
  * Provides implementation of SFC Service.
  */
@@ -80,9 +96,13 @@
 public class SfcManager implements SfcService {
 
     private final Logger log = getLogger(getClass());
+
+    private String nshSpiIdTopic = "nsh-spi-id";
     private static final String APP_ID = "org.onosproject.app.vtn";
     private static final int SFC_PRIORITY = 1000;
     private static final int NULL_PORT = 0;
+    private static final int MAX_NSH_SPI_ID = 0x7FFFF;
+    private static final int MAX_LOAD_BALANCE_ID = 0x20;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected VtnRscService vtnRscService;
@@ -97,26 +117,34 @@
     protected PortChainService portChainService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected PortPairGroupService portPairGroupService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowClassifierService flowClassifierService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected VirtualPortService virtualPortService;
 
-    private SfcPacketProcessor processor = new SfcPacketProcessor();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    protected SfcPacketProcessor processor = new SfcPacketProcessor();
 
     protected ApplicationId appId;
-    private ServiceFunctionForwarderService serviceFunctionForwarderService;
-    private FlowClassifierInstallerService flowClassifierInstallerService;
+    protected ServiceFunctionForwarderService serviceFunctionForwarder;
+    protected FlowClassifierInstallerService flowClassifierInstaller;
+    protected IdGenerator nshSpiIdGenerator;
+    protected EventuallyConsistentMap<PortChainId, Integer> nshSpiPortChainMap;
+    protected DistributedSet<Integer> nshSpiIdFreeList;
 
     private final VtnRscListener vtnRscListener = new InnerVtnRscListener();
 
-    private ConcurrentMap<PortChainId, NshServicePathId> nshSpiPortChainMap = new ConcurrentHashMap<>();
-
     @Activate
     public void activate() {
         appId = coreService.registerApplication(APP_ID);
-        serviceFunctionForwarderService = new ServiceFunctionForwarderImpl(appId);
-        flowClassifierInstallerService = new FlowClassifierInstallerImpl(appId);
+        serviceFunctionForwarder = new ServiceFunctionForwarderImpl(appId);
+        flowClassifierInstaller = new FlowClassifierInstallerImpl(appId);
+        nshSpiIdGenerator = coreService.getIdGenerator(nshSpiIdTopic);
 
         vtnRscService.addListener(vtnRscListener);
 
@@ -127,6 +155,18 @@
                 .register(FlowClassifierId.class)
                 .register(PortChainId.class);
 
+        nshSpiPortChainMap = storageService.<PortChainId, Integer>eventuallyConsistentMapBuilder()
+                .withName("nshSpiPortChainMap")
+                .withSerializer(serializer)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+
+        nshSpiIdFreeList = storageService.<Integer>setBuilder()
+                .withName("nshSpiIdDeletedList")
+                .withSerializer(Serializer.using(KryoNamespaces.API))
+                .build()
+                .asDistributedSet();
+
         packetService.addProcessor(processor, PacketProcessor.director(SFC_PRIORITY));
         log.info("Started");
     }
@@ -230,15 +270,20 @@
         NshServicePathId nshSpi;
         log.info("onPortChainCreated");
         if (nshSpiPortChainMap.containsKey(portChain.portChainId())) {
-            nshSpi = nshSpiPortChainMap.get(portChain.portChainId());
+            nshSpi = NshServicePathId.of(nshSpiPortChainMap.get(portChain.portChainId()));
         } else {
-            nshSpi = NshServicePathId.of(NshSpiIdGenerators.create());
-            nshSpiPortChainMap.put(portChain.portChainId(), nshSpi);
+            int id = getNextNshSpi();
+            if (id > MAX_NSH_SPI_ID) {
+                log.error("Reached max limit of service path index."
+                        + "Failed to install SFC for port chain {}", portChain.portChainId().toString());
+                return;
+            }
+            nshSpi = NshServicePathId.of(id);
+            nshSpiPortChainMap.put(portChain.portChainId(), new Integer(id));
         }
 
-        // install in OVS.
-        flowClassifierInstallerService.installFlowClassifier(portChain, nshSpi);
-        serviceFunctionForwarderService.installForwardingRule(portChain, nshSpi);
+        // Install classifier rule to send the packet to controller
+        flowClassifierInstaller.installFlowClassifier(portChain, nshSpi);
     }
 
     @Override
@@ -248,13 +293,47 @@
             throw new ItemNotFoundException("Unable to find NSH SPI");
         }
 
-        NshServicePathId nshSpi = nshSpiPortChainMap.get(portChain.portChainId());
-        // uninstall from OVS.
-        flowClassifierInstallerService.unInstallFlowClassifier(portChain, nshSpi);
-        serviceFunctionForwarderService.unInstallForwardingRule(portChain, nshSpi);
+        int nshSpiId = nshSpiPortChainMap.get(portChain.portChainId());
+        // Uninstall classifier rules
+        flowClassifierInstaller.unInstallFlowClassifier(portChain, NshServicePathId.of(nshSpiId));
+        // remove from nshSpiPortChainMap and add to nshSpiIdFreeList
+        nshSpiPortChainMap.remove(portChain.portChainId());
+        nshSpiIdFreeList.add(nshSpiId);
 
-        // remove SPI. No longer it will be used.
-        nshSpiPortChainMap.remove(nshSpi);
+        // Uninstall load balanced classifier and forwarding rules.
+        NshServicePathId nshSpi;
+        LoadBalanceId id;
+        List<LoadBalanceId> processedIdList = Lists.newArrayList();
+        Set<FiveTuple> fiveTupleSet = portChain.getLoadBalanceIdMapKeys();
+        for (FiveTuple fiveTuple : fiveTupleSet) {
+            id = portChain.getLoadBalanceId(fiveTuple);
+            if (processedIdList.contains(id)) {
+                // multiple five tuple can have single path.
+                continue;
+            } else {
+                processedIdList.add(id);
+            }
+            nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
+            flowClassifierInstaller.unInstallLoadBalancedFlowClassifier(portChain, fiveTuple, nshSpi);
+            serviceFunctionForwarder.unInstallLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
+                                                                         nshSpi);
+        }
+    }
+
+    /**
+     * Get next nsh service path identifier.
+     *
+     * @return value of service path identifier
+     */
+    int getNextNshSpi() {
+        // If there is any free id use it. Otherwise generate new id.
+        if (nshSpiIdFreeList.isEmpty()) {
+            return (int) nshSpiIdGenerator.getNewId();
+        }
+        Iterator<Integer> it = nshSpiIdFreeList.iterator();
+        Integer value = it.next();
+        nshSpiIdFreeList.remove(value);
+        return value;
     }
 
     private class SfcPacketProcessor implements PacketProcessor {
@@ -296,6 +375,10 @@
             // Identify the port chain to which the packet belongs
             for (final PortChain portChain : portChains) {
 
+                if (!portChain.tenantId().equals(fiveTuple.tenantId())) {
+                    continue;
+                }
+
                 Iterable<FlowClassifierId> flowClassifiers = portChain.flowClassifiers();
 
                 // One port chain can have multiple flow classifiers.
@@ -375,6 +458,67 @@
             return portChainId;
         }
 
+        /**
+         * Find the load balanced path set it to port chain for the given five tuple.
+         *
+         * @param portChainId port chain id
+         * @param fiveTuple five tuple info
+         * @return load balance id
+         */
+        private LoadBalanceId loadBalanceSfc(PortChainId portChainId, FiveTuple fiveTuple) {
+
+            // Get the port chain
+            PortChain portChain = portChainService.getPortChain(portChainId);
+            List<PortPairId> loadBalancePath = Lists.newArrayList();
+            LoadBalanceId id;
+            int paths = portChain.getLoadBalancePathSize();
+            if (paths >= MAX_LOAD_BALANCE_ID) {
+                log.info("Max limit reached for load balance paths. "
+                        + "Reusing the created path for port chain {} with five tuple {}",
+                        portChainId, fiveTuple);
+                id = LoadBalanceId.of((byte) ((paths + 1) % MAX_LOAD_BALANCE_ID));
+                portChain.addLoadBalancePath(fiveTuple, id, portChain.getLoadBalancePath(id));
+            }
+
+            // Get the list of port pair groups from port chain
+            Iterable<PortPairGroupId> portPairGroups = portChain.portPairGroups();
+            for (final PortPairGroupId portPairGroupId : portPairGroups) {
+                PortPairGroup portPairGroup = portPairGroupService.getPortPairGroup(portPairGroupId);
+
+                // Get the list of port pair ids from port pair group.
+                Iterable<PortPairId> portPairs = portPairGroup.portPairs();
+                int minLoad = 0xFFF;
+                PortPairId minLoadPortPairId = null;
+                for (final PortPairId portPairId : portPairs) {
+                    int load = portPairGroup.getLoad(portPairId);
+                    if (load == 0) {
+                        minLoadPortPairId = portPairId;
+                        break;
+                    } else {
+                        // Check the port pair which has min load.
+                        if (load < minLoad) {
+                            minLoad = load;
+                            minLoadPortPairId = portPairId;
+                        }
+                    }
+                }
+                if (minLoadPortPairId != null) {
+                    loadBalancePath.add(minLoadPortPairId);
+                    portPairGroup.addLoad(minLoadPortPairId);
+                }
+            }
+
+            // Check if the path already exists, if not create a new id
+            Optional<LoadBalanceId> output = portChain.matchPath(loadBalancePath);
+            if (output.isPresent()) {
+                id = output.get();
+            } else {
+                id = LoadBalanceId.of((byte) (paths + 1));
+            }
+
+            portChain.addLoadBalancePath(fiveTuple, id, loadBalancePath);
+            return id;
+        }
 
         /**
          * Get the tenant id for the given mac address.
@@ -443,9 +587,59 @@
                 return;
             }
 
-            // TODO
+            // Once the 5 tuple and port chain are identified, give this input for load balancing
+            LoadBalanceId id = loadBalanceSfc(portChainId, fiveTuple);
+            // Get nsh service path index
+            NshServicePathId nshSpi;
+            PortChain portChain = portChainService.getPortChain(portChainId);
+            if (nshSpiPortChainMap.containsKey(portChain.portChainId())) {
+                int nshSpiId = nshSpiPortChainMap.get(portChain.portChainId());
+                nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
+            } else {
+                int nshSpiId = getNextNshSpi();
+                if (nshSpiId > MAX_NSH_SPI_ID) {
+                    log.error("Reached max limit of service path index."
+                            + "Failed to install SFC for port chain {}", portChain.portChainId());
+                    return;
+                }
+                nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
+                nshSpiPortChainMap.put(portChain.portChainId(), new Integer(nshSpiId));
+            }
             // download the required flow rules for classifier and forwarding
-            // resend the packet back to classifier
+            // install in OVS.
+            ConnectPoint connectPoint = flowClassifierInstaller.installLoadBalancedFlowClassifier(portChain,
+                                                                                                  fiveTuple, nshSpi);
+            serviceFunctionForwarder.installLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
+                                                                       nshSpi);
+            sendPacket(context, connectPoint);
         }
+
+        /**
+         * Send packet back to classifier.
+         *
+         * @param context packet context
+         * @param connectPoint connect point of first service function
+         */
+        private void sendPacket(PacketContext context, ConnectPoint connectPoint) {
+
+            TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(connectPoint.port()).build();
+            OutboundPacket packet = new DefaultOutboundPacket(connectPoint.deviceId(), treatment,
+                                                              context.inPacket().unparsed());
+            packetService.emit(packet);
+            log.trace("Sending packet: {}", packet);
+        }
+    }
+
+    /**
+     * Encapsulate 5 bit load balance id to nsh spi.
+     *
+     * @param id load balance identifier
+     * @param nshSpiId nsh service path index
+     * @return updated service path index
+     */
+    protected int getNshServicePathId(LoadBalanceId id, int nshSpiId) {
+        int nshSpiNew = nshSpiId << 5;
+        nshSpiNew = nshSpiNew | id.loadBalanceId();
+        return nshSpiNew;
     }
 }
diff --git a/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/DefaultPortChain.java b/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/DefaultPortChain.java
index d2c2e2e..750824f 100644
--- a/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/DefaultPortChain.java
+++ b/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/DefaultPortChain.java
@@ -142,6 +142,14 @@
     }
 
     @Override
+    public int getLoadBalancePathSize() {
+        if (sfcLoadBalanceIdMap.isEmpty()) {
+            return 0;
+        }
+        return sfcLoadBalanceIdMap.size();
+    }
+
+    @Override
     public Optional<LoadBalanceId> matchPath(List<PortPairId> path) {
 
         LoadBalanceId id = null;
diff --git a/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/PortChain.java b/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/PortChain.java
index ba87010..e89a22e 100644
--- a/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/PortChain.java
+++ b/apps/vtn/vtnrsc/src/main/java/org/onosproject/vtnrsc/PortChain.java
@@ -113,6 +113,13 @@
     List<PortPairId> getLoadBalancePath(FiveTuple fiveTuple);
 
     /**
+     * Get the no of load balance paths created.
+     *
+     * @return size of load balanced paths
+     */
+    int getLoadBalancePathSize();
+
+    /**
      * Match the given path with existing load balanced paths.
      *
      * @param path load balanced path
diff --git a/apps/vtn/vtnweb/src/test/java/org/onosproject/vtnweb/resources/PortChainResourceTest.java b/apps/vtn/vtnweb/src/test/java/org/onosproject/vtnweb/resources/PortChainResourceTest.java
index b172c09..b4447f1 100644
--- a/apps/vtn/vtnweb/src/test/java/org/onosproject/vtnweb/resources/PortChainResourceTest.java
+++ b/apps/vtn/vtnweb/src/test/java/org/onosproject/vtnweb/resources/PortChainResourceTest.java
@@ -142,38 +142,37 @@
 
         @Override
         public void addLoadBalancePath(FiveTuple fiveTuple, LoadBalanceId id, List<PortPairId> path) {
-            // TODO Auto-generated method stub
         }
 
         @Override
         public LoadBalanceId getLoadBalanceId(FiveTuple fiveTuple) {
-            // TODO Auto-generated method stub
             return null;
         }
 
         @Override
         public Set<FiveTuple> getLoadBalanceIdMapKeys() {
-            // TODO Auto-generated method stub
             return null;
         }
 
         @Override
         public List<PortPairId> getLoadBalancePath(LoadBalanceId id) {
-            // TODO Auto-generated method stub
             return null;
         }
 
         @Override
         public List<PortPairId> getLoadBalancePath(FiveTuple fiveTuple) {
-            // TODO Auto-generated method stub
             return null;
         }
 
         @Override
         public Optional<LoadBalanceId> matchPath(List<PortPairId> path) {
-            // TODO Auto-generated method stub
             return null;
         }
+
+        @Override
+        public int getLoadBalancePathSize() {
+            return 0;
+        }
     }
 
     /**