Refactored OpenstackRouting to support multiple gateway nodes

Change-Id: I6870ca9a4fd6f6b1cf2d2be72f52ef87827e1d2c
diff --git a/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java b/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java
index 55fce2c..59b41ff 100644
--- a/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java
+++ b/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java
@@ -1,319 +1,273 @@
 /*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2016-present Open Networking Laboratory
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.onosproject.openstacknetworking.switching;
 
-import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.Ethernet;
 import org.onlab.packet.Ip4Address;
-import org.onlab.packet.IpPrefix;
-import org.onlab.packet.VlanId;
-import org.onosproject.core.CoreService;
-import org.onosproject.dhcp.DhcpService;
-import org.onosproject.dhcp.IpAssignment;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.Device;
+import org.onlab.packet.IpAddress;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
-import org.onosproject.net.HostId;
-import org.onosproject.net.HostLocation;
-import org.onosproject.net.Port;
-import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.host.DefaultHostDescription;
-import org.onosproject.net.host.HostDescription;
-import org.onosproject.net.host.HostProvider;
-import org.onosproject.net.host.HostProviderRegistry;
-import org.onosproject.net.host.HostProviderService;
-import org.onosproject.net.host.HostService;
-import org.onosproject.net.provider.AbstractProvider;
-import org.onosproject.net.provider.ProviderId;
-import org.onosproject.openstackinterface.OpenstackInterfaceService;
-import org.onosproject.openstackinterface.OpenstackNetwork;
-import org.onosproject.openstackinterface.OpenstackPort;
-import org.onosproject.openstackinterface.OpenstackSubnet;
-import org.onosproject.openstacknode.OpenstackNode;
-import org.onosproject.openstacknode.OpenstackNodeEvent;
-import org.onosproject.openstacknode.OpenstackNodeListener;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.openstacknetworking.AbstractVmHandler;
 import org.onosproject.openstacknode.OpenstackNodeService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Date;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.Objects;
+import java.util.Optional;
 
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.dhcp.IpAssignment.AssignmentStatus.Option_RangeNotEnforced;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.net.AnnotationKeys.PORT_NAME;
 import static org.onosproject.openstacknetworking.Constants.*;
+import static org.onosproject.openstacknetworking.RulePopulatorUtil.buildExtension;
 
-@Service
-@Component(immediate = true)
 /**
- * Populates forwarding rules for VMs created by Openstack.
+ * Populates switching flow rules.
  */
-public final class OpenstackSwitchingManager extends AbstractProvider
-        implements HostProvider {
+@Component(immediate = true)
+public final class OpenstackSwitchingManager extends AbstractVmHandler {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CoreService coreService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected HostProviderRegistry hostProviderRegistry;
+    protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected DhcpService dhcpService;
+    protected OpenstackNodeService nodeService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected HostService hostService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected MastershipService mastershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected OpenstackInterfaceService openstackService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected OpenstackNodeService openstackNodeService;
-
-    private final ExecutorService deviceEventExecutor =
-            Executors.newSingleThreadExecutor(groupedThreads("onos/openstackswitching", "device-event"));
-    private final ExecutorService configEventExecutor =
-            Executors.newSingleThreadExecutor(groupedThreads("onos/openstackswitching", "config-event"));
-    private final InternalDeviceListener internalDeviceListener = new InternalDeviceListener();
-    private final InternalOpenstackNodeListener internalNodeListener = new InternalOpenstackNodeListener();
-
-    private HostProviderService hostProvider;
-
-    /**
-     * Creates OpenStack switching host provider.
-     */
-    public OpenstackSwitchingManager() {
-        super(new ProviderId("host", APP_ID));
-    }
+    private ApplicationId appId;
 
     @Activate
     protected void activate() {
-        coreService.registerApplication(APP_ID);
-        deviceService.addListener(internalDeviceListener);
-        openstackNodeService.addListener(internalNodeListener);
-        hostProvider = hostProviderRegistry.register(this);
-
-        log.info("Started");
+        super.activate();
+        appId = coreService.registerApplication(SWITCHING_APP_ID);
     }
 
     @Deactivate
     protected void deactivate() {
-        hostProviderRegistry.unregister(this);
-        deviceService.removeListener(internalDeviceListener);
-        openstackNodeService.removeListener(internalNodeListener);
+        super.deactivate();
+    }
 
-        deviceEventExecutor.shutdown();
-        configEventExecutor.shutdown();
+    private void populateSwitchingRules(Host host) {
+        populateFlowRulesForTunnelTag(host);
+        populateFlowRulesForTrafficToSameCnode(host);
+        populateFlowRulesForTrafficToDifferentCnode(host);
 
-        log.info("Stopped");
+        log.debug("Populated switching rule for {}", host);
+    }
+
+    private void populateFlowRulesForTunnelTag(Host host) {
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+                .matchInPort(host.location().port());
+
+        tBuilder.setTunnelId(Long.valueOf(getVni(host)));
+
+        ForwardingObjective fo = DefaultForwardingObjective.builder()
+                .withSelector(sBuilder.build())
+                .withTreatment(tBuilder.build())
+                .withPriority(TUNNELTAG_RULE_PRIORITY)
+                .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                .fromApp(appId)
+                .add();
+
+        flowObjectiveService.forward(host.location().deviceId(), fo);
+    }
+
+    private void populateFlowRulesForTrafficToSameCnode(Host host) {
+        //For L2 Switching Case
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(getIp(host).toIpPrefix())
+                .matchTunnelId(Long.valueOf(getVni(host)));
+
+        tBuilder.setOutput(host.location().port());
+
+        ForwardingObjective fo = DefaultForwardingObjective.builder()
+                .withSelector(sBuilder.build())
+                .withTreatment(tBuilder.build())
+                .withPriority(SWITCHING_RULE_PRIORITY)
+                .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                .fromApp(appId)
+                .add();
+
+        flowObjectiveService.forward(host.location().deviceId(), fo);
+    }
+
+    private void populateFlowRulesForTrafficToDifferentCnode(Host host) {
+        Ip4Address localVmIp = getIp(host);
+        DeviceId localDeviceId = host.location().deviceId();
+        Optional<IpAddress> localDataIp = nodeService.dataIp(localDeviceId);
+
+        if (!localDataIp.isPresent()) {
+            log.debug("Failed to get data IP for device {}",
+                    host.location().deviceId());
+            return;
+        }
+
+        String vni = getVni(host);
+        getVmsInDifferentCnode(host).forEach(remoteVm -> {
+            Optional<IpAddress> remoteDataIp = nodeService.dataIp(remoteVm.location().deviceId());
+            if (remoteDataIp.isPresent()) {
+                setVxLanFlowRule(vni,
+                        localDeviceId,
+                        remoteDataIp.get().getIp4Address(),
+                        getIp(remoteVm));
+
+                setVxLanFlowRule(vni,
+                        remoteVm.location().deviceId(),
+                        localDataIp.get().getIp4Address(),
+                        localVmIp);
+            }
+        });
+    }
+
+    private void setVxLanFlowRule(String vni, DeviceId deviceId, Ip4Address remoteIp,
+                                  Ip4Address vmIp) {
+        Optional<PortNumber> tunnelPort = nodeService.tunnelPort(deviceId);
+        if (!tunnelPort.isPresent()) {
+            log.warn("Failed to get tunnel port from {}", deviceId);
+            return;
+        }
+
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+                .matchTunnelId(Long.parseLong(vni))
+                .matchIPDst(vmIp.toIpPrefix());
+        tBuilder.extension(buildExtension(deviceService, deviceId, remoteIp), deviceId)
+                .setOutput(tunnelPort.get());
+
+        ForwardingObjective fo = DefaultForwardingObjective.builder()
+                .withSelector(sBuilder.build())
+                .withTreatment(tBuilder.build())
+                .withPriority(SWITCHING_RULE_PRIORITY)
+                .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                .fromApp(appId)
+                .add();
+
+        flowObjectiveService.forward(deviceId, fo);
+    }
+
+    private void removeSwitchingRules(Host host) {
+        removeFlowRuleForTunnelTag(host);
+        removeFlowRuleForVMsInSameCnode(host);
+        removeFlowRuleForVMsInDiffrentCnode(host);
+
+        log.debug("Removed switching rule for {}", host);
+    }
+
+    private void removeFlowRuleForTunnelTag(Host host) {
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+                .matchInPort(host.location().port());
+
+        ForwardingObjective fo = DefaultForwardingObjective.builder()
+                .withSelector(sBuilder.build())
+                .withTreatment(tBuilder.build())
+                .withPriority(TUNNELTAG_RULE_PRIORITY)
+                .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                .fromApp(appId)
+                .remove();
+
+        flowObjectiveService.forward(host.location().deviceId(), fo);
+    }
+
+    private void removeFlowRuleForVMsInSameCnode(Host host) {
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(getIp(host).toIpPrefix())
+                .matchTunnelId(Long.valueOf(getVni(host)));
+
+        ForwardingObjective fo = DefaultForwardingObjective.builder()
+                .withSelector(sBuilder.build())
+                .withTreatment(DefaultTrafficTreatment.builder().build())
+                .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                .withPriority(SWITCHING_RULE_PRIORITY)
+                .fromApp(appId)
+                .remove();
+
+        flowObjectiveService.forward(host.location().deviceId(), fo);
+    }
+
+    private void removeFlowRuleForVMsInDiffrentCnode(Host host) {
+        DeviceId deviceId = host.location().deviceId();
+        final boolean anyPortRemainedInSameCnode = hostService.getConnectedHosts(deviceId)
+                .stream()
+                .filter(this::isValidHost)
+                .anyMatch(h -> Objects.equals(getVni(h), getVni(host)));
+
+        getVmsInDifferentCnode(host).forEach(h -> {
+           removeVxLanFlowRule(h.location().deviceId(), getIp(host), getVni(host));
+           if (!anyPortRemainedInSameCnode) {
+               removeVxLanFlowRule(deviceId, getIp(h), getVni(host));
+           }
+       });
+    }
+
+    private void removeVxLanFlowRule(DeviceId deviceId, Ip4Address vmIp, String vni) {
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4)
+                .matchTunnelId(Long.valueOf(vni))
+                .matchIPDst(vmIp.toIpPrefix());
+
+        ForwardingObjective fo = DefaultForwardingObjective.builder()
+                .withSelector(sBuilder.build())
+                .withTreatment(DefaultTrafficTreatment.builder().build())
+                .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                .withPriority(SWITCHING_RULE_PRIORITY)
+                .fromApp(appId)
+                .remove();
+
+        flowObjectiveService.forward(deviceId, fo);
     }
 
     @Override
-    public void triggerProbe(Host host) {
-        // no probe is required
+    protected void hostDetected(Host host) {
+        populateSwitchingRules(host);
+        log.info("Added new virtual machine to switching service {}", host);
     }
 
-    private void processPortAdded(Port port) {
-        // TODO check the node state is COMPLETE
-        OpenstackPort osPort = openstackService.port(port);
-        if (osPort == null) {
-            log.warn("Failed to get OpenStack port for {}", port);
-            return;
-        }
-
-        OpenstackNetwork osNet = openstackService.network(osPort.networkId());
-        if (osNet == null) {
-            log.warn("Failed to get OpenStack network {}",
-                    osPort.networkId());
-            return;
-        }
-
-        registerDhcpInfo(osPort);
-        ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
-        // TODO remove this and openstackPortInfo
-        String gatewayIp = osNet.subnets().stream().findFirst().get().gatewayIp();
-
-        // Added CREATE_TIME intentionally to trigger HOST_UPDATED event for the
-        // existing instances.
-        DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
-                .set(NETWORK_ID, osPort.networkId())
-                .set(PORT_ID, osPort.id())
-                .set(VXLAN_ID, osNet.segmentId())
-                .set(TENANT_ID, osNet.tenantId())
-                // TODO remove this and openstackPortInfo
-                .set(GATEWAY_IP, gatewayIp)
-                .set(CREATE_TIME, String.valueOf(System.currentTimeMillis()));
-
-        HostDescription hostDesc = new DefaultHostDescription(
-                osPort.macAddress(),
-                VlanId.NONE,
-                new HostLocation(connectPoint, System.currentTimeMillis()),
-                Sets.newHashSet(osPort.fixedIps().values()),
-                annotations.build());
-
-        HostId hostId = HostId.hostId(osPort.macAddress());
-        hostProvider.hostDetected(hostId, hostDesc, false);
-    }
-
-    private void processPortRemoved(Port port) {
-        ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
-        removeHosts(connectPoint);
-    }
-
-    private void removeHosts(ConnectPoint connectPoint) {
-        hostService.getConnectedHosts(connectPoint).stream()
-                .forEach(host -> {
-                    dhcpService.removeStaticMapping(host.mac());
-                    hostProvider.hostVanished(host.id());
-                });
-    }
-
-    private void registerDhcpInfo(OpenstackPort openstackPort) {
-        checkNotNull(openstackPort);
-        checkArgument(!openstackPort.fixedIps().isEmpty());
-
-        OpenstackSubnet openstackSubnet = openstackService.subnets().stream()
-                .filter(n -> n.networkId().equals(openstackPort.networkId()))
-                .findFirst().orElse(null);
-        if (openstackSubnet == null) {
-            log.warn("Failed to find subnet for {}", openstackPort);
-            return;
-        }
-
-        Ip4Address ipAddress = openstackPort.fixedIps().values().stream().findFirst().get();
-        IpPrefix subnetPrefix = IpPrefix.valueOf(openstackSubnet.cidr());
-        Ip4Address broadcast = Ip4Address.makeMaskedAddress(
-                ipAddress,
-                subnetPrefix.prefixLength());
-
-        // TODO: supports multiple DNS servers
-        Ip4Address domainServer = openstackSubnet.dnsNameservers().isEmpty() ?
-                DNS_SERVER_IP : openstackSubnet.dnsNameservers().get(0);
-
-        IpAssignment ipAssignment = IpAssignment.builder()
-                .ipAddress(ipAddress)
-                .leasePeriod(DHCP_INFINITE_LEASE)
-                .timestamp(new Date())
-                .subnetMask(Ip4Address.makeMaskPrefix(subnetPrefix.prefixLength()))
-                .broadcast(broadcast)
-                .domainServer(domainServer)
-                .assignmentStatus(Option_RangeNotEnforced)
-                .routerAddress(Ip4Address.valueOf(openstackSubnet.gatewayIp()))
-                .build();
-
-        dhcpService.setStaticMapping(openstackPort.macAddress(), ipAssignment);
-    }
-
-    private class InternalDeviceListener implements DeviceListener {
-
-        @Override
-        public void event(DeviceEvent event) {
-            Device device = event.subject();
-            if (!mastershipService.isLocalMaster(device.id())) {
-                // do not allow to proceed without mastership
-                return;
-            }
-
-            Port port = event.port();
-            if (port == null) {
-                return;
-            }
-
-            String portName = port.annotations().value(PORT_NAME);
-            if (Strings.isNullOrEmpty(portName) ||
-                    !portName.startsWith(PORTNAME_PREFIX_VM)) {
-                // handles VM connected port event only
-                return;
-            }
-
-            switch (event.type()) {
-                case PORT_UPDATED:
-                    if (!event.port().isEnabled()) {
-                        deviceEventExecutor.execute(() -> processPortRemoved(event.port()));
-                    }
-                    break;
-                case PORT_ADDED:
-                    deviceEventExecutor.execute(() -> processPortAdded(event.port()));
-                    break;
-                default:
-                    break;
-            }
-        }
-    }
-
-    private class InternalOpenstackNodeListener implements OpenstackNodeListener {
-
-        @Override
-        public void event(OpenstackNodeEvent event) {
-            OpenstackNode node = event.node();
-            // TODO check leadership of the node and make only the leader process
-
-            switch (event.type()) {
-                case COMPLETE:
-                    log.info("COMPLETE node {} detected", node.hostname());
-
-                    // adds existing VMs running on the complete state node
-                    deviceService.getPorts(node.intBridge()).stream()
-                            .filter(port -> port.annotations().value(PORT_NAME)
-                                    .startsWith(PORTNAME_PREFIX_VM) &&
-                                    port.isEnabled())
-                            .forEach(port -> {
-                                deviceEventExecutor.execute(() -> processPortAdded(port));
-                                log.info("VM is detected on {}", port);
-                            });
-
-                    // removes stale VMs
-                    hostService.getHosts().forEach(host -> {
-                        if (deviceService.getPort(host.location().deviceId(),
-                                host.location().port()) == null) {
-                            deviceEventExecutor.execute(() -> removeHosts(host.location()));
-                            log.info("Removed stale VM {}", host.location());
-                        }
-                    });
-                    break;
-                case INCOMPLETE:
-                    log.warn("{} is changed to INCOMPLETE state", node);
-                    break;
-                case INIT:
-                case DEVICE_CREATED:
-                default:
-                    break;
-            }
-        }
+    @Override
+    protected void hostRemoved(Host host) {
+        removeSwitchingRules(host);
+        log.info("Removed virtual machine from switching service {}", host);
     }
 }