[ONOS-4612]Update SFC flows inline with the Official OVS NSH patch

Change-Id: If58517841096a939860d88aa78eca7cae46b9935
diff --git a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
index 0d7702c..3292e23 100644
--- a/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
+++ b/apps/vtn/sfcmgr/src/main/java/org/onosproject/sfc/manager/impl/SfcManager.java
@@ -20,8 +20,9 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Optional;
+import java.util.ListIterator;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -50,10 +51,7 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
-import org.onosproject.sfc.forwarder.ServiceFunctionForwarderService;
-import org.onosproject.sfc.forwarder.impl.ServiceFunctionForwarderImpl;
-import org.onosproject.sfc.installer.FlowClassifierInstallerService;
-import org.onosproject.sfc.installer.impl.FlowClassifierInstallerImpl;
+import org.onosproject.sfc.installer.impl.SfcFlowRuleInstallerImpl;
 import org.onosproject.sfc.manager.SfcService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.DistributedSet;
@@ -77,7 +75,6 @@
 import org.onosproject.vtnrsc.VirtualPort;
 import org.onosproject.vtnrsc.VirtualPortId;
 import org.onosproject.vtnrsc.event.VtnRscEvent;
-import org.onosproject.vtnrsc.event.VtnRscEventFeedback;
 import org.onosproject.vtnrsc.event.VtnRscListener;
 import org.onosproject.vtnrsc.flowclassifier.FlowClassifierService;
 import org.onosproject.vtnrsc.portchain.PortChainService;
@@ -100,7 +97,6 @@
     private String nshSpiIdTopic = "nsh-spi-id";
     private static final String APP_ID = "org.onosproject.app.vtn";
     private static final int SFC_PRIORITY = 1000;
-    private static final int NULL_PORT = 0;
     private static final int MAX_NSH_SPI_ID = 0x7FFFF;
     private static final int MAX_LOAD_BALANCE_ID = 0x20;
 
@@ -131,10 +127,9 @@
     protected SfcPacketProcessor processor = new SfcPacketProcessor();
 
     protected ApplicationId appId;
-    protected ServiceFunctionForwarderService serviceFunctionForwarder;
-    protected FlowClassifierInstallerService flowClassifierInstaller;
     protected IdGenerator nshSpiIdGenerator;
     protected EventuallyConsistentMap<PortChainId, Integer> nshSpiPortChainMap;
+    protected EventuallyConsistentMap<PortChainId, List<FiveTuple>> portChainFiveTupleMap;
     protected DistributedSet<Integer> nshSpiIdFreeList;
 
     private final VtnRscListener vtnRscListener = new InnerVtnRscListener();
@@ -142,24 +137,22 @@
     @Activate
     public void activate() {
         appId = coreService.registerApplication(APP_ID);
-        serviceFunctionForwarder = new ServiceFunctionForwarderImpl(appId);
-        flowClassifierInstaller = new FlowClassifierInstallerImpl(appId);
         nshSpiIdGenerator = coreService.getIdGenerator(nshSpiIdTopic);
 
         vtnRscService.addListener(vtnRscListener);
 
-        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
-                .register(TenantId.class)
-                .register(PortPairId.class)
-                .register(PortPairGroupId.class)
-                .register(FlowClassifierId.class)
-                .register(PortChainId.class);
+        KryoNamespace.Builder serializer = KryoNamespace
+                .newBuilder()
+                .register(PortChainId.class, UUID.class, FiveTuple.class, IpAddress.class, PortNumber.class,
+                          DefaultFiveTuple.class, IpAddress.Version.class, TenantId.class);
 
         nshSpiPortChainMap = storageService.<PortChainId, Integer>eventuallyConsistentMapBuilder()
-                .withName("nshSpiPortChainMap")
-                .withSerializer(serializer)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp())
-                .build();
+                .withName("nshSpiPortChainMap").withSerializer(serializer)
+                .withTimestampProvider((k, v) ->new WallClockTimestamp()).build();
+
+        portChainFiveTupleMap = storageService.<PortChainId, List<FiveTuple>>eventuallyConsistentMapBuilder()
+                .withName("portChainFiveTupleMap").withSerializer(serializer)
+                .withTimestampProvider((k, v) ->new WallClockTimestamp()).build();
 
         nshSpiIdFreeList = storageService.<Integer>setBuilder()
                 .withName("nshSpiIdDeletedList")
@@ -186,43 +179,47 @@
         public void event(VtnRscEvent event) {
 
             if (VtnRscEvent.Type.PORT_PAIR_PUT == event.type()) {
-                PortPair portPair = ((VtnRscEventFeedback) event.subject()).portPair();
+                PortPair portPair = event.subject().portPair();
                 onPortPairCreated(portPair);
             } else if (VtnRscEvent.Type.PORT_PAIR_DELETE == event.type()) {
-                PortPair portPair = ((VtnRscEventFeedback) event.subject()).portPair();
+                PortPair portPair = event.subject().portPair();
                 onPortPairDeleted(portPair);
             } else if (VtnRscEvent.Type.PORT_PAIR_UPDATE == event.type()) {
-                PortPair portPair = ((VtnRscEventFeedback) event.subject()).portPair();
+                PortPair portPair = event.subject().portPair();
                 onPortPairDeleted(portPair);
                 onPortPairCreated(portPair);
             } else if (VtnRscEvent.Type.PORT_PAIR_GROUP_PUT == event.type()) {
-                PortPairGroup portPairGroup = ((VtnRscEventFeedback) event.subject()).portPairGroup();
+                PortPairGroup portPairGroup = event.subject().portPairGroup();
                 onPortPairGroupCreated(portPairGroup);
             } else if (VtnRscEvent.Type.PORT_PAIR_GROUP_DELETE == event.type()) {
-                PortPairGroup portPairGroup = ((VtnRscEventFeedback) event.subject()).portPairGroup();
+                PortPairGroup portPairGroup = event.subject().portPairGroup();
                 onPortPairGroupDeleted(portPairGroup);
             } else if (VtnRscEvent.Type.PORT_PAIR_GROUP_UPDATE == event.type()) {
-                PortPairGroup portPairGroup = ((VtnRscEventFeedback) event.subject()).portPairGroup();
+                PortPairGroup portPairGroup = event.subject().portPairGroup();
                 onPortPairGroupDeleted(portPairGroup);
                 onPortPairGroupCreated(portPairGroup);
             } else if (VtnRscEvent.Type.FLOW_CLASSIFIER_PUT == event.type()) {
-                FlowClassifier flowClassifier = ((VtnRscEventFeedback) event.subject()).flowClassifier();
+                FlowClassifier flowClassifier = event.subject().flowClassifier();
                 onFlowClassifierCreated(flowClassifier);
             } else if (VtnRscEvent.Type.FLOW_CLASSIFIER_DELETE == event.type()) {
-                FlowClassifier flowClassifier = ((VtnRscEventFeedback) event.subject()).flowClassifier();
+                FlowClassifier flowClassifier = event.subject().flowClassifier();
                 onFlowClassifierDeleted(flowClassifier);
             } else if (VtnRscEvent.Type.FLOW_CLASSIFIER_UPDATE == event.type()) {
-                FlowClassifier flowClassifier = ((VtnRscEventFeedback) event.subject()).flowClassifier();
+                FlowClassifier flowClassifier = event.subject().flowClassifier();
                 onFlowClassifierDeleted(flowClassifier);
                 onFlowClassifierCreated(flowClassifier);
             } else if (VtnRscEvent.Type.PORT_CHAIN_PUT == event.type()) {
-                PortChain portChain = (PortChain) ((VtnRscEventFeedback) event.subject()).portChain();
+                PortChain portChain = event.subject().portChain();
+                if (portChain.oldPortChain() != null) {
+                    onPortChainDeleted(portChain.oldPortChain());
+                }
                 onPortChainCreated(portChain);
             } else if (VtnRscEvent.Type.PORT_CHAIN_DELETE == event.type()) {
-                PortChain portChain = (PortChain) ((VtnRscEventFeedback) event.subject()).portChain();
+                PortChain portChain = event.subject().portChain();
                 onPortChainDeleted(portChain);
+                portChainFiveTupleMap.remove(portChain.portChainId());
             } else if (VtnRscEvent.Type.PORT_CHAIN_UPDATE == event.type()) {
-                PortChain portChain = (PortChain) ((VtnRscEventFeedback) event.subject()).portChain();
+                PortChain portChain = event.subject().portChain();
                 onPortChainDeleted(portChain);
                 onPortChainCreated(portChain);
             }
@@ -232,58 +229,69 @@
     @Override
     public void onPortPairCreated(PortPair portPair) {
         log.debug("onPortPairCreated");
-        // TODO: Modify forwarding rule on port-pair creation.
+        // Do nothing
     }
 
     @Override
     public void onPortPairDeleted(PortPair portPair) {
         log.debug("onPortPairDeleted");
-        // TODO: Modify forwarding rule on port-pair deletion.
+        // Do nothing
     }
 
     @Override
     public void onPortPairGroupCreated(PortPairGroup portPairGroup) {
         log.debug("onPortPairGroupCreated");
-        // TODO: Modify forwarding rule on port-pair-group creation.
+        // Do nothing
     }
 
     @Override
     public void onPortPairGroupDeleted(PortPairGroup portPairGroup) {
         log.debug("onPortPairGroupDeleted");
-        // TODO: Modify forwarding rule on port-pair-group deletion.
+        // Do nothing
     }
 
     @Override
     public void onFlowClassifierCreated(FlowClassifier flowClassifier) {
         log.debug("onFlowClassifierCreated");
-        // TODO: Modify forwarding rule on flow-classifier creation.
+        // Do nothing
     }
 
     @Override
     public void onFlowClassifierDeleted(FlowClassifier flowClassifier) {
         log.debug("onFlowClassifierDeleted");
-        // TODO: Modify forwarding rule on flow-classifier deletion.
+        // Do nothing
     }
 
     @Override
     public void onPortChainCreated(PortChain portChain) {
         NshServicePathId nshSpi;
-        log.info("onPortChainCreated");
-        if (nshSpiPortChainMap.containsKey(portChain.portChainId())) {
-            nshSpi = NshServicePathId.of(nshSpiPortChainMap.get(portChain.portChainId()));
-        } else {
-            int id = getNextNshSpi();
-            if (id > MAX_NSH_SPI_ID) {
-                log.error("Reached max limit of service path index."
-                        + "Failed to install SFC for port chain {}", portChain.portChainId().toString());
-                return;
-            }
-            nshSpi = NshServicePathId.of(id);
-            nshSpiPortChainMap.put(portChain.portChainId(), new Integer(id));
-        }
+        log.info("On port chain created");
 
+        int spi = getNextNshSpi();
+        if (spi > MAX_NSH_SPI_ID) {
+            log.error("Reached max limit of service path index." + "Failed to install SFC for port chain {}",
+                      portChain.portChainId().toString());
+            return;
+        }
+        nshSpi = NshServicePathId.of(spi);
+        nshSpiPortChainMap.put(portChain.portChainId(), new Integer(spi));
+        if (!portChainFiveTupleMap.containsKey(portChain.portChainId())) {
+            portChainFiveTupleMap.put(portChain.portChainId(), Lists.newArrayList());
+        }
         // Install classifier rule to send the packet to controller
-        flowClassifierInstaller.installFlowClassifier(portChain, nshSpi);
+        SfcFlowRuleInstallerImpl flowRuleInstaller = new SfcFlowRuleInstallerImpl(appId);
+        flowRuleInstaller.installFlowClassifier(portChain, nshSpi);
+
+        // Install rules for already identified five tuples.
+        List<FiveTuple> list = portChainFiveTupleMap.get(portChain.portChainId());
+        for (FiveTuple fiveTuple : list) {
+            LoadBalanceId id = loadBalanceSfc(portChain.portChainId(), fiveTuple);
+            // Get nsh service path index
+            nshSpi = NshServicePathId.of(getNshServicePathId(id, spi));
+            // download the required flow rules for classifier and
+            // forwarding
+            flowRuleInstaller.installLoadBalancedFlowRules(portChain, fiveTuple, nshSpi);
+        }
     }
 
     @Override
@@ -295,7 +303,8 @@
 
         int nshSpiId = nshSpiPortChainMap.get(portChain.portChainId());
         // Uninstall classifier rules
-        flowClassifierInstaller.unInstallFlowClassifier(portChain, NshServicePathId.of(nshSpiId));
+        SfcFlowRuleInstallerImpl flowRuleInstaller = new SfcFlowRuleInstallerImpl(appId);
+        flowRuleInstaller.unInstallFlowClassifier(portChain, NshServicePathId.of(nshSpiId));
         // remove from nshSpiPortChainMap and add to nshSpiIdFreeList
         nshSpiPortChainMap.remove(portChain.portChainId());
         nshSpiIdFreeList.add(nshSpiId);
@@ -314,9 +323,16 @@
                 processedIdList.add(id);
             }
             nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
-            flowClassifierInstaller.unInstallLoadBalancedFlowClassifier(portChain, fiveTuple, nshSpi);
-            serviceFunctionForwarder.unInstallLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
-                                                                         nshSpi);
+            flowRuleInstaller.unInstallLoadBalancedFlowRules(portChain, fiveTuple, nshSpi);
+        }
+
+        // Reset load for all the port pairs
+        List<PortPairGroupId> ppgIdlist = portChain.portPairGroups();
+        ListIterator<PortPairGroupId> ppgIdListIterator = ppgIdlist.listIterator();
+        while (ppgIdListIterator.hasNext()) {
+            PortPairGroupId portPairGroupId = ppgIdListIterator.next();
+            PortPairGroup ppg = portPairGroupService.getPortPairGroup(portPairGroupId);
+            ppg.resetLoad();
         }
     }
 
@@ -388,9 +404,12 @@
                     boolean match = false;
                     // Check whether protocol is set in flow classifier
                     if (flowClassifier.protocol() != null) {
-                        if ((flowClassifier.protocol().equals("TCP") && fiveTuple.protocol() == IPv4.PROTOCOL_TCP) ||
-                                (flowClassifier.protocol().equals("UDP") &&
-                                        fiveTuple.protocol() == IPv4.PROTOCOL_UDP)) {
+                        if ((flowClassifier.protocol().equalsIgnoreCase("TCP")
+                                && fiveTuple.protocol() == IPv4.PROTOCOL_TCP)
+                                || (flowClassifier.protocol().equalsIgnoreCase("UDP")
+                                        && fiveTuple.protocol() == IPv4.PROTOCOL_UDP)
+                                        || (flowClassifier.protocol().equalsIgnoreCase("ICMP")
+                                                && fiveTuple.protocol() == IPv4.PROTOCOL_ICMP)) {
                             match = true;
                         } else {
                             continue;
@@ -459,68 +478,6 @@
         }
 
         /**
-         * Find the load balanced path set it to port chain for the given five tuple.
-         *
-         * @param portChainId port chain id
-         * @param fiveTuple five tuple info
-         * @return load balance id
-         */
-        private LoadBalanceId loadBalanceSfc(PortChainId portChainId, FiveTuple fiveTuple) {
-
-            // Get the port chain
-            PortChain portChain = portChainService.getPortChain(portChainId);
-            List<PortPairId> loadBalancePath = Lists.newArrayList();
-            LoadBalanceId id;
-            int paths = portChain.getLoadBalancePathSize();
-            if (paths >= MAX_LOAD_BALANCE_ID) {
-                log.info("Max limit reached for load balance paths. "
-                        + "Reusing the created path for port chain {} with five tuple {}",
-                        portChainId, fiveTuple);
-                id = LoadBalanceId.of((byte) ((paths + 1) % MAX_LOAD_BALANCE_ID));
-                portChain.addLoadBalancePath(fiveTuple, id, portChain.getLoadBalancePath(id));
-            }
-
-            // Get the list of port pair groups from port chain
-            Iterable<PortPairGroupId> portPairGroups = portChain.portPairGroups();
-            for (final PortPairGroupId portPairGroupId : portPairGroups) {
-                PortPairGroup portPairGroup = portPairGroupService.getPortPairGroup(portPairGroupId);
-
-                // Get the list of port pair ids from port pair group.
-                Iterable<PortPairId> portPairs = portPairGroup.portPairs();
-                int minLoad = 0xFFF;
-                PortPairId minLoadPortPairId = null;
-                for (final PortPairId portPairId : portPairs) {
-                    int load = portPairGroup.getLoad(portPairId);
-                    if (load == 0) {
-                        minLoadPortPairId = portPairId;
-                        break;
-                    } else {
-                        // Check the port pair which has min load.
-                        if (load < minLoad) {
-                            minLoad = load;
-                            minLoadPortPairId = portPairId;
-                        }
-                    }
-                }
-                if (minLoadPortPairId != null) {
-                    loadBalancePath.add(minLoadPortPairId);
-                    portPairGroup.addLoad(minLoadPortPairId);
-                }
-            }
-
-            // Check if the path already exists, if not create a new id
-            Optional<LoadBalanceId> output = portChain.matchPath(loadBalancePath);
-            if (output.isPresent()) {
-                id = output.get();
-            } else {
-                id = LoadBalanceId.of((byte) (paths + 1));
-            }
-
-            portChain.addLoadBalancePath(fiveTuple, id, loadBalancePath);
-            return id;
-        }
-
-        /**
          * Get the tenant id for the given mac address.
          *
          * @param mac mac address
@@ -561,7 +518,7 @@
                     TCP tcpPacket = (TCP) ipv4Packet.getPayload();
                     portSrc = tcpPacket.getSourcePort();
                     portDst = tcpPacket.getDestinationPort();
-                } else  if (protocol == IPv4.PROTOCOL_UDP) {
+                } else if (protocol == IPv4.PROTOCOL_UDP) {
                     UDP udpPacket = (UDP) ipv4Packet.getPayload();
                     portSrc = udpPacket.getSourcePort();
                     portDst = udpPacket.getDestinationPort();
@@ -571,7 +528,7 @@
                     // No need to process other packets received by controller.
                     return;
                 }
-            } else if (ethType == Ethernet.TYPE_IPV6) {
+            } else {
                 return;
             }
 
@@ -587,11 +544,11 @@
             PortChainId portChainId = findPortChainFromFiveTuple(fiveTuple);
 
             if (portChainId == null) {
-                log.error("Packet does not match with any classifier");
                 return;
             }
 
             // Once the 5 tuple and port chain are identified, give this input for load balancing
+            addToPortChainIdFiveTupleMap(portChainId, fiveTuple);
             LoadBalanceId id = loadBalanceSfc(portChainId, fiveTuple);
             // Get nsh service path index
             NshServicePathId nshSpi;
@@ -611,11 +568,9 @@
             }
             // download the required flow rules for classifier and forwarding
             // install in OVS.
-            ConnectPoint connectPoint = flowClassifierInstaller.installLoadBalancedFlowClassifier(portChain,
-                                                                                                  fiveTuple, nshSpi);
-            serviceFunctionForwarder.installLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
-                                                                       nshSpi);
-            sendPacket(context, connectPoint);
+            SfcFlowRuleInstallerImpl flowRuleInstaller = new SfcFlowRuleInstallerImpl(appId);
+            flowRuleInstaller.installLoadBalancedFlowRules(portChain, fiveTuple, nshSpi);
+            sendPacket(context);
         }
 
         /**
@@ -624,11 +579,13 @@
          * @param context packet context
          * @param connectPoint connect point of first service function
          */
-        private void sendPacket(PacketContext context, ConnectPoint connectPoint) {
+        private void sendPacket(PacketContext context) {
 
-            TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(connectPoint.port()).build();
-            OutboundPacket packet = new DefaultOutboundPacket(connectPoint.deviceId(), treatment,
-                                                              context.inPacket().unparsed());
+            ConnectPoint sourcePoint = context.inPacket().receivedFrom();
+
+            TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(sourcePoint.port()).build();
+            OutboundPacket packet = new DefaultOutboundPacket(sourcePoint.deviceId(), treatment, context.inPacket()
+                    .unparsed());
             packetService.emit(packet);
             log.trace("Sending packet: {}", packet);
         }
@@ -646,4 +603,71 @@
         nshSpiNew = nshSpiNew | id.loadBalanceId();
         return nshSpiNew;
     }
+
+    private void addToPortChainIdFiveTupleMap(PortChainId portChainId, FiveTuple fiveTuple) {
+        List<FiveTuple> list = portChainFiveTupleMap.get(portChainId);
+        list.add(fiveTuple);
+        portChainFiveTupleMap.put(portChainId, list);
+    }
+
+    /**
+     * Find the load balanced path set it to port chain for the given five
+     * tuple.
+     *
+     * @param portChainId port chain id
+     * @param fiveTuple five tuple info
+     * @return load balance id
+     */
+    private LoadBalanceId loadBalanceSfc(PortChainId portChainId, FiveTuple fiveTuple) {
+
+        // Get the port chain
+        PortChain portChain = portChainService.getPortChain(portChainId);
+        List<PortPairId> loadBalancePath = Lists.newArrayList();
+        LoadBalanceId id;
+        int paths = portChain.getLoadBalancePathSize();
+        if (paths >= MAX_LOAD_BALANCE_ID) {
+            log.info("Max limit reached for load balance paths. "
+                    + "Reusing the created path for port chain {} with five tuple {}", portChainId, fiveTuple);
+            id = LoadBalanceId.of((byte) ((paths + 1) % MAX_LOAD_BALANCE_ID));
+            portChain.addLoadBalancePath(fiveTuple, id, portChain.getLoadBalancePath(id));
+        }
+
+        // Get the list of port pair groups from port chain
+        Iterable<PortPairGroupId> portPairGroups = portChain.portPairGroups();
+        for (final PortPairGroupId portPairGroupId : portPairGroups) {
+            PortPairGroup portPairGroup = portPairGroupService.getPortPairGroup(portPairGroupId);
+
+            // Get the list of port pair ids from port pair group.
+            Iterable<PortPairId> portPairs = portPairGroup.portPairs();
+            int minLoad = 0xFFF;
+            PortPairId minLoadPortPairId = null;
+            for (final PortPairId portPairId : portPairs) {
+                int load = portPairGroup.getLoad(portPairId);
+                if (load == 0) {
+                    minLoadPortPairId = portPairId;
+                    break;
+                } else {
+                    // Check the port pair which has min load.
+                    if (load < minLoad) {
+                        minLoad = load;
+                        minLoadPortPairId = portPairId;
+                    }
+                }
+            }
+            if (minLoadPortPairId != null) {
+                loadBalancePath.add(minLoadPortPairId);
+                portPairGroup.addLoad(minLoadPortPairId);
+            }
+        }
+
+        // Check if the path already exists, if not create a new id
+        id = portChain.matchPath(loadBalancePath);
+        if (id == null) {
+            id = LoadBalanceId.of((byte) (paths + 1));
+        }
+
+        portChain.addLoadBalancePath(fiveTuple, id, loadBalancePath);
+        return id;
+    }
+
 }