Add group table, flow rules by listening to openstack node events

Change-Id: Ifbb1ae9c812e9bc24260e960c17b5430dcf59a11
diff --git a/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java b/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java
index d6f63eb..c90571f 100644
--- a/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java
+++ b/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/DistributedOpenstackVtapStore.java
@@ -75,7 +75,7 @@
 
     private ConsistentMap<OpenstackVtapId, DefaultOpenstackVtap> vTapConsistentMap;
     private MapEventListener<OpenstackVtapId, DefaultOpenstackVtap>
-                                            vTapListener = new VTapEventListener();
+                                            vTapListener = new VtapEventListener();
     private Map<OpenstackVtapId, DefaultOpenstackVtap> vTapMap;
 
     private static final Serializer SERIALIZER = Serializer
@@ -115,7 +115,7 @@
 
         vTapStatusListener = status -> {
             if (status == Status.ACTIVE) {
-                eventExecutor.execute(this::loadVTapIds);
+                eventExecutor.execute(this::loadVtapIds);
             }
         };
         vTapConsistentMap.addStatusChangeListener(vTapStatusListener);
@@ -132,56 +132,6 @@
         log.info("Stopped {} - {}", this.getClass().getSimpleName());
     }
 
-    private void loadVTapIds() {
-        vTapIdsByTxDeviceId.clear();
-        vTapIdsByRxDeviceId.clear();
-        vTapMap.values().forEach(vTap -> refreshDeviceIdsByVtap(null, vTap));
-    }
-
-    private boolean shouldUpdate(DefaultOpenstackVtap existing,
-                                 OpenstackVtap description,
-                                 boolean replaceDevices) {
-        if (existing == null) {
-            return true;
-        }
-
-        if ((description.type() != null && !description.type().equals(existing.type()))
-                || (description.vTapCriterion() != null &&
-                !description.vTapCriterion().equals(existing.vTapCriterion()))) {
-            return true;
-        }
-
-        if (description.txDeviceIds() != null) {
-            if (replaceDevices) {
-                if (!existing.txDeviceIds().equals(description.txDeviceIds())) {
-                    return true;
-                }
-            } else {
-                if (!existing.txDeviceIds().containsAll(description.txDeviceIds())) {
-                    return true;
-                }
-            }
-        }
-
-        if (description.rxDeviceIds() != null) {
-            if (replaceDevices) {
-                if (!existing.rxDeviceIds().equals(description.rxDeviceIds())) {
-                    return true;
-                }
-            } else {
-                if (!existing.rxDeviceIds().containsAll(description.rxDeviceIds())) {
-                    return true;
-                }
-            }
-        }
-
-        // check to see if any of the annotations provided by vTap
-        // differ from those in the existing vTap
-        return description.annotations().keys().stream()
-                .anyMatch(k -> !Objects.equals(description.annotations().value(k),
-                        existing.annotations().value(k)));
-    }
-
     @Override
     public OpenstackVtap createOrUpdateVtap(OpenstackVtapId vTapId,
                                             OpenstackVtap description,
@@ -446,7 +396,57 @@
                         .collect(Collectors.toSet()));
     }
 
-    private class VTapComparator implements Comparator<OpenstackVtap> {
+    private void loadVtapIds() {
+        vTapIdsByTxDeviceId.clear();
+        vTapIdsByRxDeviceId.clear();
+        vTapMap.values().forEach(vTap -> refreshDeviceIdsByVtap(null, vTap));
+    }
+
+    private boolean shouldUpdate(DefaultOpenstackVtap existing,
+                                 OpenstackVtap description,
+                                 boolean replaceDevices) {
+        if (existing == null) {
+            return true;
+        }
+
+        if ((description.type() != null && !description.type().equals(existing.type()))
+                || (description.vTapCriterion() != null &&
+                !description.vTapCriterion().equals(existing.vTapCriterion()))) {
+            return true;
+        }
+
+        if (description.txDeviceIds() != null) {
+            if (replaceDevices) {
+                if (!existing.txDeviceIds().equals(description.txDeviceIds())) {
+                    return true;
+                }
+            } else {
+                if (!existing.txDeviceIds().containsAll(description.txDeviceIds())) {
+                    return true;
+                }
+            }
+        }
+
+        if (description.rxDeviceIds() != null) {
+            if (replaceDevices) {
+                if (!existing.rxDeviceIds().equals(description.rxDeviceIds())) {
+                    return true;
+                }
+            } else {
+                if (!existing.rxDeviceIds().containsAll(description.rxDeviceIds())) {
+                    return true;
+                }
+            }
+        }
+
+        // check to see if any of the annotations provided by vTap
+        // differ from those in the existing vTap
+        return description.annotations().keys().stream()
+                .anyMatch(k -> !Objects.equals(description.annotations().value(k),
+                        existing.annotations().value(k)));
+    }
+
+    private class VtapComparator implements Comparator<OpenstackVtap> {
         @Override
         public int compare(OpenstackVtap v1, OpenstackVtap v2) {
             int diff = (v2.type().compareTo(v1.type()));
@@ -531,7 +531,7 @@
         }
     }
 
-    private class VTapEventListener
+    private class VtapEventListener
             implements MapEventListener<OpenstackVtapId, DefaultOpenstackVtap> {
         @Override
         public void event(MapEvent<OpenstackVtapId, DefaultOpenstackVtap> event) {
@@ -540,7 +540,7 @@
             DefaultOpenstackVtap oldValue =
                     event.oldValue() != null ? event.oldValue().value() : null;
 
-            log.debug("VTapEventListener {} -> {}, {}", event.type(), oldValue, newValue);
+            log.debug("VtapEventListener {} -> {}, {}", event.type(), oldValue, newValue);
             switch (event.type()) {
                 case INSERT:
                     refreshDeviceIdsByVtap(oldValue, newValue);
diff --git a/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/OpenstackVtapManager.java b/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/OpenstackVtapManager.java
index 1f12dd1..843a602 100644
--- a/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/OpenstackVtapManager.java
+++ b/apps/openstackvtap/app/src/main/java/org/onosproject/openstackvtap/impl/OpenstackVtapManager.java
@@ -18,12 +18,14 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.VlanId;
 import org.onosproject.cluster.ClusterService;
@@ -34,6 +36,7 @@
 import org.onosproject.core.GroupId;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
 import org.onosproject.net.HostLocation;
@@ -57,31 +60,30 @@
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.instructions.ExtensionTreatment;
-import org.onosproject.net.flow.instructions.ExtensionTreatmentType;
 import org.onosproject.net.group.DefaultGroupBucket;
 import org.onosproject.net.group.DefaultGroupDescription;
-import org.onosproject.net.group.DefaultGroupKey;
 import org.onosproject.net.group.GroupBucket;
 import org.onosproject.net.group.GroupBuckets;
 import org.onosproject.net.group.GroupDescription;
-import org.onosproject.net.group.GroupKey;
 import org.onosproject.net.group.GroupService;
 import org.onosproject.net.host.HostEvent;
 import org.onosproject.net.host.HostListener;
 import org.onosproject.net.host.HostService;
+import org.onosproject.openstacknode.api.OpenstackNodeEvent;
+import org.onosproject.openstacknode.api.OpenstackNodeListener;
+import org.onosproject.openstacknode.api.OpenstackNodeService;
 import org.onosproject.openstackvtap.api.OpenstackVtap;
+import org.onosproject.openstackvtap.api.OpenstackVtap.Type;
 import org.onosproject.openstackvtap.api.OpenstackVtapAdminService;
 import org.onosproject.openstackvtap.api.OpenstackVtapCriterion;
 import org.onosproject.openstackvtap.api.OpenstackVtapEvent;
+import org.onosproject.openstackvtap.api.OpenstackVtapId;
 import org.onosproject.openstackvtap.api.OpenstackVtapListener;
 import org.onosproject.openstackvtap.api.OpenstackVtapService;
 import org.onosproject.openstackvtap.api.OpenstackVtapStore;
 import org.onosproject.openstackvtap.api.OpenstackVtapStoreDelegate;
-import org.onosproject.openstackvtap.api.OpenstackVtap.Type;
-import org.onosproject.openstackvtap.api.OpenstackVtapId;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
-import com.google.common.collect.Sets;
 
 import java.util.List;
 import java.util.Objects;
@@ -97,7 +99,9 @@
 import static org.onlab.packet.IPv4.PROTOCOL_ICMP;
 import static org.onlab.packet.IPv4.PROTOCOL_TCP;
 import static org.onlab.packet.IPv4.PROTOCOL_UDP;
+import static org.onlab.packet.VlanId.UNTAGGED;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_RESUBMIT_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.DHCP_ARP_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.FLAT_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.FORWARDING_TABLE;
@@ -110,6 +114,8 @@
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_GROUP_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_MIRROR_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
+import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
+import static org.onosproject.openstackvtap.util.OpenstackVtapUtil.getGroupKey;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -153,6 +159,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackVtapStore store;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OpenstackNodeService osNodeService;
+
     public static final String APP_ID = "org.onosproject.openstackvtap";
 
     public static final String VTAP_ID_NULL = "OpenstackVtap ID cannot be null";
@@ -168,10 +177,9 @@
     private static final int FLAT_OUTBOUND_NEXT_TABLE = FLAT_TABLE;
     private static final int OUTBOUND_NEXT_TABLE = FORWARDING_TABLE;
 
-    private static final String VTAP_GROUP_KEY = "VTAP_GROUP_KEY";
-
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final HostListener hostListener = new InternalHostListener();
+    private final OpenstackNodeListener osNodeListener = new InternalOpenstackNodeListener();
 
     private OpenstackVtapStoreDelegate delegate = new InternalStoreDelegate();
 
@@ -194,12 +202,17 @@
 
         deviceService.addListener(deviceListener);
         hostService.addListener(hostListener);
+        osNodeService.addListener(osNodeListener);
+
+        // TODO: need to sweep through device store and add flow rules and
+        // group tables to mirror VM traffic
 
         log.info("Started {} - {}", appId.name(), this.getClass().getSimpleName());
     }
 
     @Deactivate
     public void deactivate() {
+        osNodeService.removeListener(osNodeListener);
         hostService.removeListener(hostListener);
         deviceService.removeListener(deviceListener);
 
@@ -209,6 +222,8 @@
         eventExecutor.shutdown();
         leadershipService.withdraw(appId.name());
 
+        // TODO: need to purge vtap related flow rules and group tables
+
         log.info("Stopped {} - {}", appId.name(), this.getClass().getSimpleName());
     }
 
@@ -229,30 +244,15 @@
     }
 
     @Override
-    public Set<OpenstackVtap> getVtapsByDeviceId(OpenstackVtap.Type type, DeviceId deviceId) {
+    public Set<OpenstackVtap> getVtapsByDeviceId(OpenstackVtap.Type type,
+                                                 DeviceId deviceId) {
         checkNotNull(deviceId, DEVICE_ID_NULL);
         return store.getVtapsByDeviceId(type, deviceId);
     }
 
-    private Set<DeviceId> getEdgeDevice(Type type, OpenstackVtapCriterion vTapCriterionOpenstack) {
-        Set<DeviceId> deviceIds = Sets.newConcurrentHashSet();
-        StreamSupport.stream(hostService.getHosts().spliterator(), true)
-                .forEach(host -> {
-                    if (host.ipAddresses().stream().anyMatch(ipAddress ->
-                            (type.isValid(Type.VTAP_TX) &&
-                                    vTapCriterionOpenstack.srcIpPrefix().contains(ipAddress)) ||
-                                    (type.isValid(Type.VTAP_RX) &&
-                                            vTapCriterionOpenstack.dstIpPrefix().contains(ipAddress)))) {
-                        deviceIds.addAll(host.locations().stream()
-                                .map(HostLocation::deviceId)
-                                .collect(Collectors.toSet()));
-                    }
-                });
-        return deviceIds;
-    }
-
     @Override
-    public OpenstackVtap createVtap(Type type, OpenstackVtapCriterion vTapCriterionOpenstack) {
+    public OpenstackVtap createVtap(Type type,
+                                    OpenstackVtapCriterion vTapCriterionOpenstack) {
         checkNotNull(vTapCriterionOpenstack, VTAP_DESC_NULL);
 
         Set<DeviceId> txDevices = type.isValid(Type.VTAP_TX) ?
@@ -260,13 +260,14 @@
         Set<DeviceId> rxDevices = type.isValid(Type.VTAP_RX) ?
                 getEdgeDevice(type, vTapCriterionOpenstack) : ImmutableSet.of();
 
-        OpenstackVtap description = DefaultOpenstackVtap.builder()
-                                                        .id(OpenstackVtapId.vTapId())
-                                                        .type(type)
-                                                        .vTapCriterion(vTapCriterionOpenstack)
-                                                        .txDeviceIds(txDevices)
-                                                        .rxDeviceIds(rxDevices)
-                                                        .build();
+        OpenstackVtap description =
+                            DefaultOpenstackVtap.builder()
+                                                .id(OpenstackVtapId.vTapId())
+                                                .type(type)
+                                                .vTapCriterion(vTapCriterionOpenstack)
+                                                .txDeviceIds(txDevices)
+                                                .rxDeviceIds(rxDevices)
+                                                .build();
         return store.createOrUpdateVtap(description.id(), description, true);
     }
 
@@ -284,13 +285,14 @@
         Set<DeviceId> rxDevices = vTap.type().isValid(Type.VTAP_RX) ?
                 getEdgeDevice(vTap.type(), vTap.vTapCriterion()) : ImmutableSet.of();
 
-        DefaultOpenstackVtap description = DefaultOpenstackVtap.builder()
-                                                                .id(vTapId)
-                                                                .type(vTap.type())
-                                                                .vTapCriterion(vTap.vTapCriterion())
-                                                                .txDeviceIds(txDevices)
-                                                                .rxDeviceIds(rxDevices)
-                                                                .build();
+        DefaultOpenstackVtap description =
+                            DefaultOpenstackVtap.builder()
+                                                .id(vTapId)
+                                                .type(vTap.type())
+                                                .vTapCriterion(vTap.vTapCriterion())
+                                                .txDeviceIds(txDevices)
+                                                .rxDeviceIds(rxDevices)
+                                                .build();
         return store.createOrUpdateVtap(vTapId, description, true);
     }
 
@@ -301,11 +303,14 @@
     }
 
     @Override
-    public void setVtapOutput(DeviceId deviceId, OpenstackVtap.Type type, PortNumber portNumber, VlanId vlanId) {
+    public void setVtapOutput(DeviceId deviceId, OpenstackVtap.Type type,
+                              PortNumber portNumber, VlanId vlanId) {
+
         // Make output table
         if (type.isValid(Type.VTAP_TX)) {
             createOutputTable(deviceId, VTAP_INBOUND_MIRROR_TABLE, portNumber, vlanId);
         }
+
         if (type.isValid(Type.VTAP_RX)) {
             createOutputTable(deviceId, VTAP_FLAT_OUTBOUND_MIRROR_TABLE, portNumber, vlanId);
             createOutputTable(deviceId, VTAP_OUTBOUND_MIRROR_TABLE, portNumber, vlanId);
@@ -317,58 +322,66 @@
         // TODO: need to provide implementation
     }
 
-    // Triggers driver setup when a host is (re)detected.
-    private class InternalHostListener implements HostListener {
-        @Override
-        public boolean isRelevant(HostEvent event) {
-            // do not allow to proceed without leadership
-            NodeId leader = leadershipService.getLeader(appId.name());
-            return Objects.equals(localNodeId, leader);
-        }
-
-        @Override
-        public void event(HostEvent event) {
-            HostEvent.Type type = event.type();
-            Host host = event.subject();
-            log.trace("InternalHostListener hostId={}, type={}", host.id(), type);
-
-            switch (type) {
-                case HOST_ADDED:
-                    eventExecutor.execute(() -> updateHost(host, null));
-                    break;
-
-                case HOST_REMOVED:
-                    eventExecutor.execute(() -> updateHost(null, host));
-                    break;
-
-                case HOST_UPDATED:
-                case HOST_MOVED:
-                    eventExecutor.execute(() -> updateHost(host, event.prevSubject()));
-                    break;
-                default:
-                    break;
-            }
-        }
+    /**
+     * Obtains the identifier set of edge device where the targeted host is located.
+     * Note that, in most of cases target host is attached to one device,
+     * however, in some cases, the host can be attached to multiple devices.
+     *
+     * @param type          vTap type
+     * @param criterion     vTap criterion
+     * @return a collection of device identifiers
+     */
+    private Set<DeviceId> getEdgeDevice(Type type, OpenstackVtapCriterion criterion) {
+        Set<DeviceId> deviceIds = Sets.newConcurrentHashSet();
+        StreamSupport.stream(hostService.getHosts().spliterator(), true)
+            .forEach(host -> {
+                if (host.ipAddresses().stream()
+                        .anyMatch(ip -> containsIp(type, criterion, ip))) {
+                    deviceIds.addAll(host.locations().stream()
+                                         .map(HostLocation::deviceId)
+                                         .collect(Collectors.toSet()));
+                }
+            });
+        return deviceIds;
     }
 
-    private boolean hostDifference(Host host1, Host host2, IpPrefix ipPrefix) {
-        return ((host1 != null && host1.ipAddresses().stream().anyMatch(ipPrefix::contains)) &&
-                (host2 == null || host2.ipAddresses().stream().noneMatch(ipPrefix::contains)));
+    /**
+     * Checks whether the given IP address is included in vTap criterion.
+     * We both check the TX and RX directions.
+     *
+     * @param type          vTap type
+     * @param criterion     vTap criterion
+     * @param ip            IP address
+     * @return boolean value indicates the check result
+     */
+    private boolean containsIp(Type type, OpenstackVtapCriterion criterion, IpAddress ip) {
+        boolean isTxEdge = type.isValid(Type.VTAP_TX) &&
+                                             criterion.srcIpPrefix().contains(ip);
+        boolean isRxEdge = type.isValid(Type.VTAP_RX) &&
+                                             criterion.dstIpPrefix().contains(ip);
+
+        return isTxEdge || isRxEdge;
     }
 
+    /**
+     * Updates device list of vTaps with respect to the host changes.
+     *
+     * @param newHost   new host instance
+     * @param oldHost   old host instance
+     */
     private void updateHost(Host newHost, Host oldHost) {
         // update devices for vTap tx
         getVtaps(Type.VTAP_TX).parallelStream().forEach(vTap -> {
-            if (hostDifference(oldHost, newHost, vTap.vTapCriterion().srcIpPrefix())) {
-                oldHost.locations().stream()
-                        .map(HostLocation::deviceId)
+
+            if (hostDiff(oldHost, newHost, vTap.vTapCriterion().srcIpPrefix())) {
+                oldHost.locations().stream().map(HostLocation::deviceId)
                         .forEach(deviceId ->
-                                store.removeDeviceFromVtap(vTap.id(),
-                                        Type.VTAP_TX, oldHost.location().deviceId()));
+                                store.removeDeviceFromVtap(vTap.id(), Type.VTAP_TX,
+                                        oldHost.location().deviceId()));
             }
-            if (hostDifference(newHost, oldHost, vTap.vTapCriterion().srcIpPrefix())) {
-                newHost.locations().stream()
-                        .map(HostLocation::deviceId)
+
+            if (hostDiff(newHost, oldHost, vTap.vTapCriterion().srcIpPrefix())) {
+                newHost.locations().stream().map(HostLocation::deviceId)
                         .forEach(deviceId ->
                                 store.addDeviceToVtap(vTap.id(), Type.VTAP_TX,
                                         newHost.location().deviceId()));
@@ -377,16 +390,16 @@
 
         // update devices for vTap rx
         getVtaps(Type.VTAP_RX).parallelStream().forEach(vTap -> {
-            if (hostDifference(oldHost, newHost, vTap.vTapCriterion().dstIpPrefix())) {
-                oldHost.locations().stream()
-                        .map(HostLocation::deviceId)
+
+            if (hostDiff(oldHost, newHost, vTap.vTapCriterion().dstIpPrefix())) {
+                oldHost.locations().stream().map(HostLocation::deviceId)
                         .forEach(deviceId ->
                                 store.removeDeviceFromVtap(vTap.id(), Type.VTAP_RX,
                                         oldHost.location().deviceId()));
             }
-            if (hostDifference(newHost, oldHost, vTap.vTapCriterion().dstIpPrefix())) {
-                newHost.locations().stream()
-                        .map(HostLocation::deviceId)
+
+            if (hostDiff(newHost, oldHost, vTap.vTapCriterion().dstIpPrefix())) {
+                newHost.locations().stream().map(HostLocation::deviceId)
                         .forEach(deviceId ->
                                 store.addDeviceToVtap(vTap.id(), Type.VTAP_RX,
                                         newHost.location().deviceId()));
@@ -394,35 +407,29 @@
         });
     }
 
-    // Triggers driver setup when a device is (re)detected.
-    private class InternalDeviceListener implements DeviceListener {
-        @Override
-        public boolean isRelevant(DeviceEvent event) {
-            // do not allow to proceed without Mastership
-            DeviceId deviceId = event.subject().id();
-            return mastershipService.isLocalMaster(deviceId);
-        }
-
-        @Override
-        public void event(DeviceEvent event) {
-            DeviceEvent.Type type = event.type();
-            DeviceId deviceId = event.subject().id();
-            log.trace("InternalDeviceListener deviceId={}, type={}", deviceId, type);
-
-            switch (type) {
-                case DEVICE_ADDED:
-                    eventExecutor.execute(() -> updateDevice(deviceId));
-                    break;
-                default:
-                    break;
-            }
-        }
+    /**
+     * Checks whether the given IP prefix is contained in the first host rather
+     * than in the second host.
+     *
+     * @param host1     first host instance
+     * @param host2     second host instance
+     * @param ipPrefix  IP prefix to be looked up
+     * @return boolean value
+     */
+    private boolean hostDiff(Host host1, Host host2, IpPrefix ipPrefix) {
+        return ((host1 != null && host1.ipAddresses().stream().anyMatch(ipPrefix::contains)) &&
+                (host2 == null || host2.ipAddresses().stream().noneMatch(ipPrefix::contains)));
     }
 
-    private void updateDevice(DeviceId deviceId) {
+    /**
+     * Initializes the flow rules and group table of the given device identifier.
+     *
+     * @param deviceId device identifier
+     */
+    private void initFlowAndGroupByDeviceId(DeviceId deviceId) {
         // Make vTap pipeline
         // TODO: need to selective creation by store device consistentMap
-        createVtapPipeline(deviceId);
+        initVtapPipeline(deviceId);
 
         // Install tx filter
         getVtapsByDeviceId(Type.VTAP_TX, deviceId).forEach(vTap -> {
@@ -442,33 +449,72 @@
         });
     }
 
-    private void clearRules() {
-        flowRuleService.removeFlowRulesById(appId);
-        deviceService.getDevices().forEach(device -> {
-            groupService.getGroups(device.id(), appId).forEach(group -> {
-                groupService.removeGroup(device.id(), group.appCookie(), appId);
-            });
+    /**
+     * Initializes vTap pipeline of the given device.
+     *
+     * @param deviceId device identifier
+     */
+    private void initVtapPipeline(DeviceId deviceId) {
+        // Make output table
+        createOutputTable(deviceId, VTAP_INBOUND_MIRROR_TABLE, null, null);
+        createOutputTable(deviceId, VTAP_FLAT_OUTBOUND_MIRROR_TABLE, null, null);
+        createOutputTable(deviceId, VTAP_OUTBOUND_MIRROR_TABLE, null, null);
+
+        // Make tx group table
+        createGroupTable(deviceId, VTAP_INBOUND_GROUP_TABLE,
+                ImmutableList.of(INBOUND_NEXT_TABLE, VTAP_INBOUND_MIRROR_TABLE),
+                ImmutableList.of());
+
+        // Make rx group table
+        createGroupTable(deviceId, VTAP_FLAT_OUTBOUND_GROUP_TABLE,
+                ImmutableList.of(FLAT_OUTBOUND_NEXT_TABLE, VTAP_FLAT_OUTBOUND_MIRROR_TABLE),
+                ImmutableList.of());
+        createGroupTable(deviceId, VTAP_OUTBOUND_GROUP_TABLE,
+                ImmutableList.of(OUTBOUND_NEXT_TABLE, VTAP_OUTBOUND_MIRROR_TABLE),
+                ImmutableList.of());
+    }
+
+    /**
+     * Purges all flow rules and group tables using the given device identifier.
+     *
+     * @param deviceId  device identifier
+     */
+    private void clearRulesGroupTable(DeviceId deviceId) {
+        Set<FlowRule> purgedRules = Sets.newConcurrentHashSet();
+        for (FlowRule flowRule : flowRuleService.getFlowRulesById(appId)) {
+            if (flowRule.deviceId().equals(deviceId)) {
+                purgedRules.add(flowRule);
+            }
+        }
+
+        flowRuleService.removeFlowRules((FlowRule[]) purgedRules.toArray());
+
+        groupService.getGroups(deviceId, appId).forEach(group -> {
+            groupService.removeGroup(deviceId, group.appCookie(), appId);
         });
-        log.info("OpenstackVtap rules are cleared.");
+        log.info("OpenstackVtap flow rules and groups are purged");
     }
 
     private void installFilterRule(Set<DeviceId> txDeviceIds, Set<DeviceId> rxDeviceIds,
-                                   OpenstackVtapCriterion vTapCriterionOpenstack, boolean install) {
+                                   OpenstackVtapCriterion vTapCriterion, boolean install) {
         final int inbound = 0;
         final int flatOutbound = 1;
         final int outbound = 2;
 
         BiFunction<Set<DeviceId>, Integer, Void> installFlow = (deviceIds, table) -> {
             int inTable = (table == inbound ? VTAP_INBOUND_TABLE :
-                    (table == flatOutbound ? VTAP_FLAT_OUTBOUND_TABLE : VTAP_OUTBOUND_TABLE));
+                    (table == flatOutbound ? VTAP_FLAT_OUTBOUND_TABLE :
+                                             VTAP_OUTBOUND_TABLE));
+
             int outGroup = (table == inbound ? VTAP_INBOUND_GROUP_TABLE :
-                    (table == flatOutbound ? VTAP_FLAT_OUTBOUND_GROUP_TABLE : VTAP_OUTBOUND_GROUP_TABLE));
+                    (table == flatOutbound ? VTAP_FLAT_OUTBOUND_GROUP_TABLE :
+                                             VTAP_OUTBOUND_GROUP_TABLE));
+
             deviceIds.stream()
                     .filter(deviceId -> mastershipService.isLocalMaster(deviceId))
                     .forEach(deviceId -> {
-                        connectTables(deviceId,
-                                inTable, NONE_TABLE, outGroup,
-                                vTapCriterionOpenstack, PRIORITY_VTAP_RULE, install);
+                        connectTables(deviceId, inTable, NONE_TABLE, outGroup,
+                                vTapCriterion, PRIORITY_VTAP_RULE, install);
                     });
             return null;
         };
@@ -478,72 +524,6 @@
         installFlow.apply(rxDeviceIds, outbound);
     }
 
-    // Store delegate to re-post events emitted from the store.
-    private class InternalStoreDelegate implements OpenstackVtapStoreDelegate {
-        @Override
-        public void notify(OpenstackVtapEvent event) {
-            OpenstackVtapEvent.Type type = event.type();
-            OpenstackVtap vTap = event.subject();
-            log.trace("VTapStoreDelegate vTap={}, type={}", vTap, type);
-
-            switch (type) {
-                case VTAP_ADDED:
-                    eventExecutor.execute(() -> {
-                        // Add new devices
-                        installFilterRule(vTap.txDeviceIds(), vTap.rxDeviceIds(),
-                                vTap.vTapCriterion(), true);
-                    });
-                    break;
-
-                case VTAP_UPDATED:
-                    OpenstackVtap oldOpenstackVtap = event.prevSubject();
-                    eventExecutor.execute(() -> {
-                        // Remove excluded devices
-                        installFilterRule(Sets.difference(oldOpenstackVtap.txDeviceIds(), vTap.txDeviceIds()),
-                                Sets.difference(oldOpenstackVtap.rxDeviceIds(), vTap.rxDeviceIds()),
-                                oldOpenstackVtap.vTapCriterion(), false);
-
-                        // Add new devices
-                        installFilterRule(Sets.difference(vTap.txDeviceIds(), oldOpenstackVtap.txDeviceIds()),
-                                Sets.difference(vTap.rxDeviceIds(), oldOpenstackVtap.rxDeviceIds()),
-                                vTap.vTapCriterion(), true);
-                    });
-                    break;
-
-                case VTAP_REMOVED:
-                    eventExecutor.execute(() -> {
-                        // Remove excluded devices
-                        installFilterRule(vTap.txDeviceIds(), vTap.rxDeviceIds(),
-                                vTap.vTapCriterion(), false);
-                    });
-                    break;
-                default:
-                    break;
-            }
-            post(event);
-        }
-    }
-
-    private void applyFlowRule(FlowRule flowRule, boolean install) {
-        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
-
-        flowOpsBuilder = install ? flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
-
-        flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
-            @Override
-            public void onSuccess(FlowRuleOperations ops) {
-                log.trace("Provisioned vni or forwarding table");
-                //log.trace("{}", ops.toString());
-            }
-
-            @Override
-            public void onError(FlowRuleOperations ops) {
-                log.error("Failed to privision vni or forwarding table");
-                //log.error("{}", ops.toString());
-            }
-        }));
-    }
-
     private void connectTables(DeviceId deviceId, int fromTable, int toTable, int toGroup,
                                OpenstackVtapCriterion vTapCriterionOpenstack, int rulePriority,
                                boolean install) {
@@ -557,6 +537,8 @@
         switch (vTapCriterionOpenstack.ipProtocol()) {
             case PROTOCOL_TCP:
                 selectorBuilder.matchIPProtocol(vTapCriterionOpenstack.ipProtocol());
+
+                // Add port match only if the port number is greater than zero
                 if (vTapCriterionOpenstack.srcTpPort().toInt() > 0) {
                     selectorBuilder.matchTcpSrc(vTapCriterionOpenstack.srcTpPort());
                 }
@@ -566,6 +548,8 @@
                 break;
             case PROTOCOL_UDP:
                 selectorBuilder.matchIPProtocol(vTapCriterionOpenstack.ipProtocol());
+
+                // Add port match only if the port number is greater than zero
                 if (vTapCriterionOpenstack.srcTpPort().toInt() > 0) {
                     selectorBuilder.matchUdpSrc(vTapCriterionOpenstack.srcTpPort());
                 }
@@ -610,7 +594,7 @@
 
         // Set output port & vlan
         int priority = PRIORITY_VTAP_DROP;
-        if (vlanId != null) {
+        if (vlanId != null && vlanId.toShort() != UNTAGGED) {
             treatment.pushVlan().setVlanId(vlanId);
         }
         if (outPort != null) {
@@ -630,17 +614,15 @@
         applyFlowRule(flowRule, true);
     }
 
-    private GroupKey getGroupKey(int groupId) {
-        return new DefaultGroupKey((VTAP_GROUP_KEY + Integer.toString(groupId)).getBytes());
-    }
-
-    private ExtensionTreatment buildNiciraExtenstion(DeviceId id, int tableId) {
+    private ExtensionTreatment buildNiciraExtension(DeviceId id, int tableId) {
         Driver driver = driverService.getDriver(id);
-        DriverHandler driverHandler = new DefaultDriverHandler(new DefaultDriverData(driver, id));
-        ExtensionTreatmentResolver resolver = driverHandler.behaviour(ExtensionTreatmentResolver.class);
+        DriverHandler driverHandler =
+                    new DefaultDriverHandler(new DefaultDriverData(driver, id));
+        ExtensionTreatmentResolver resolver =
+                    driverHandler.behaviour(ExtensionTreatmentResolver.class);
 
-        ExtensionTreatment extensionInstruction = resolver.getExtensionInstruction(
-                ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_RESUBMIT_TABLE.type());
+        ExtensionTreatment extensionInstruction =
+                    resolver.getExtensionInstruction(NICIRA_RESUBMIT_TABLE.type());
 
         try {
             extensionInstruction.setPropertyValue("table", ((short) tableId));
@@ -656,7 +638,7 @@
         List<GroupBucket> buckets = Lists.newArrayList();
         tableIds.forEach(tableId -> {
             TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder()
-                    .extension(buildNiciraExtenstion(deviceId, tableId), deviceId);
+                    .extension(buildNiciraExtension(deviceId, tableId), deviceId);
             GroupBucket bucket = DefaultGroupBucket
                     .createAllGroupBucket(treatment.build());
             buckets.add(bucket);
@@ -678,26 +660,156 @@
         groupService.addGroup(groupDescription);
     }
 
-    private void createVtapPipeline(DeviceId deviceId) {
-        // Clear all flow rules & group tables
-        clearRules();
+    private void applyFlowRule(FlowRule flowRule, boolean install) {
+        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
 
-        // Make output table
-        createOutputTable(deviceId, VTAP_INBOUND_MIRROR_TABLE, null, null);
-        createOutputTable(deviceId, VTAP_FLAT_OUTBOUND_MIRROR_TABLE, null, null);
-        createOutputTable(deviceId, VTAP_OUTBOUND_MIRROR_TABLE, null, null);
+        flowOpsBuilder = install ? flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
 
-        // Make tx group table
-        createGroupTable(deviceId, VTAP_INBOUND_GROUP_TABLE,
-                ImmutableList.of(INBOUND_NEXT_TABLE, VTAP_INBOUND_MIRROR_TABLE),
-                ImmutableList.of());
+        flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                log.trace("Installed flow rules for tapping");
+            }
 
-        // Make rx group table
-        createGroupTable(deviceId, VTAP_FLAT_OUTBOUND_GROUP_TABLE,
-                ImmutableList.of(FLAT_OUTBOUND_NEXT_TABLE, VTAP_FLAT_OUTBOUND_MIRROR_TABLE),
-                ImmutableList.of());
-        createGroupTable(deviceId, VTAP_OUTBOUND_GROUP_TABLE,
-                ImmutableList.of(OUTBOUND_NEXT_TABLE, VTAP_OUTBOUND_MIRROR_TABLE),
-                ImmutableList.of());
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                log.error("Failed to install flow rules for tapping");
+            }
+        }));
+    }
+
+    private class InternalDeviceListener implements DeviceListener {
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            // do not allow to proceed without Mastership
+            DeviceId deviceId = event.subject().id();
+            return mastershipService.isLocalMaster(deviceId) &&
+                    event.subject().type() == Device.Type.SWITCH;
+        }
+
+        @Override
+        public void event(DeviceEvent event) {
+            DeviceEvent.Type type = event.type();
+            DeviceId deviceId = event.subject().id();
+            log.trace("InternalDeviceListener deviceId={}, type={}", deviceId, type);
+
+            switch (type) {
+                case DEVICE_ADDED:
+                    eventExecutor.execute(() -> initFlowAndGroupByDeviceId(deviceId));
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    private class InternalHostListener implements HostListener {
+        @Override
+        public boolean isRelevant(HostEvent event) {
+            // do not allow to proceed without leadership
+            NodeId leader = leadershipService.getLeader(appId.name());
+            return Objects.equals(localNodeId, leader);
+        }
+
+        @Override
+        public void event(HostEvent event) {
+            HostEvent.Type type = event.type();
+            Host host = event.subject();
+            log.trace("InternalHostListener hostId={}, type={}", host.id(), type);
+
+            switch (type) {
+                case HOST_ADDED:
+                    eventExecutor.execute(() -> updateHost(host, null));
+                    break;
+
+                case HOST_REMOVED:
+                    eventExecutor.execute(() -> updateHost(null, host));
+                    break;
+
+                case HOST_UPDATED:
+                case HOST_MOVED:
+                    eventExecutor.execute(() -> updateHost(host, event.prevSubject()));
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    private class InternalOpenstackNodeListener implements OpenstackNodeListener {
+
+        @Override
+        public boolean isRelevant(OpenstackNodeEvent event) {
+            // do not allow to proceed without leadership
+            NodeId leader = leadershipService.getLeader(appId.name());
+            return Objects.equals(localNodeId, leader) && event.subject().type() == COMPUTE;
+        }
+
+        @Override
+        public void event(OpenstackNodeEvent event) {
+            DeviceId deviceId = event.subject().intgBridge();
+            switch (event.type()) {
+                case OPENSTACK_NODE_CREATED:
+                case OPENSTACK_NODE_UPDATED:
+                    eventExecutor.execute(() -> initFlowAndGroupByDeviceId(deviceId));
+                    break;
+                case OPENSTACK_NODE_REMOVED:
+                    eventExecutor.execute(() -> clearRulesGroupTable(deviceId));
+                default:
+                    break;
+            }
+        }
+    }
+
+    // Store delegate to re-post events emitted from the store.
+    private class InternalStoreDelegate implements OpenstackVtapStoreDelegate {
+        @Override
+        public void notify(OpenstackVtapEvent event) {
+            OpenstackVtapEvent.Type type = event.type();
+            OpenstackVtap vTap = event.subject();
+            log.trace("vTapStoreDelegate vTap={}, type={}", vTap, type);
+
+            switch (type) {
+                case VTAP_ADDED:
+                    eventExecutor.execute(() -> {
+                        // Add new devices
+                        installFilterRule(vTap.txDeviceIds(), vTap.rxDeviceIds(),
+                                vTap.vTapCriterion(), true);
+                    });
+                    break;
+
+                case VTAP_UPDATED:
+                    OpenstackVtap oldOpenstackVtap = event.prevSubject();
+                    eventExecutor.execute(() -> {
+                        // Remove excluded devices
+                        installFilterRule(
+                                Sets.difference(oldOpenstackVtap.txDeviceIds(),
+                                                            vTap.txDeviceIds()),
+                                Sets.difference(oldOpenstackVtap.rxDeviceIds(),
+                                                            vTap.rxDeviceIds()),
+                                oldOpenstackVtap.vTapCriterion(), false);
+
+                        // Add new devices
+                        installFilterRule(
+                                Sets.difference(vTap.txDeviceIds(),
+                                                oldOpenstackVtap.txDeviceIds()),
+                                Sets.difference(vTap.rxDeviceIds(),
+                                                oldOpenstackVtap.rxDeviceIds()),
+                                vTap.vTapCriterion(), true);
+                    });
+                    break;
+
+                case VTAP_REMOVED:
+                    eventExecutor.execute(() -> {
+                        // Remove excluded devices
+                        installFilterRule(vTap.txDeviceIds(), vTap.rxDeviceIds(),
+                                vTap.vTapCriterion(), false);
+                    });
+                    break;
+                default:
+                    break;
+            }
+            post(event);
+        }
     }
 }