Support existing VMs running on the newly added node

- Added openstack node state event
- Made openstack switching to listen to the node state events and
  add existing VMs running in the complete state node

Change-Id: I7b7186c3b889376a4bc0385313433604dcd93d70
diff --git a/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java b/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java
index 24ab533..5388e9b 100644
--- a/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java
+++ b/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java
@@ -56,6 +56,10 @@
 import org.onosproject.openstackinterface.OpenstackSubnet;
 import org.onosproject.openstacknetworking.OpenstackPortInfo;
 import org.onosproject.openstacknetworking.OpenstackSwitchingService;
+import org.onosproject.openstacknode.OpenstackNode;
+import org.onosproject.openstacknode.OpenstackNodeEvent;
+import org.onosproject.openstacknode.OpenstackNodeListener;
+import org.onosproject.openstacknode.OpenstackNodeService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,11 +107,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackInterfaceService openstackService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OpenstackNodeService openstackNodeService;
+
     private final ExecutorService deviceEventExecutor =
             Executors.newSingleThreadExecutor(groupedThreads("onos/openstackswitching", "device-event"));
     private final ExecutorService configEventExecutor =
             Executors.newSingleThreadExecutor(groupedThreads("onos/openstackswitching", "config-event"));
     private final InternalDeviceListener internalDeviceListener = new InternalDeviceListener();
+    private final InternalOpenstackNodeListener internalNodeListener = new InternalOpenstackNodeListener();
 
     private HostProviderService hostProvider;
 
@@ -122,6 +130,7 @@
     protected void activate() {
         coreService.registerApplication(APP_ID);
         deviceService.addListener(internalDeviceListener);
+        openstackNodeService.addListener(internalNodeListener);
         hostProvider = hostProviderRegistry.register(this);
 
         log.info("Started");
@@ -131,6 +140,7 @@
     protected void deactivate() {
         hostProviderRegistry.unregister(this);
         deviceService.removeListener(internalDeviceListener);
+        openstackNodeService.removeListener(internalNodeListener);
 
         deviceEventExecutor.shutdown();
         configEventExecutor.shutdown();
@@ -179,6 +189,7 @@
     }
 
     private void processPortAdded(Port port) {
+        // TODO check the node state is COMPLETE
         OpenstackPort osPort = openstackService.port(port);
         if (osPort == null) {
             log.warn("Failed to get OpenStack port for {}", port);
@@ -221,6 +232,10 @@
 
     private void processPortRemoved(Port port) {
         ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
+        removeHosts(connectPoint);
+    }
+
+    private void removeHosts(ConnectPoint connectPoint) {
         hostService.getConnectedHosts(connectPoint).stream()
                 .forEach(host -> {
                     dhcpService.removeStaticMapping(host.mac());
@@ -300,4 +315,45 @@
             }
         }
     }
+
+    private class InternalOpenstackNodeListener implements OpenstackNodeListener {
+
+        @Override
+        public void event(OpenstackNodeEvent event) {
+            OpenstackNode node = event.node();
+            // TODO check leadership of the node and make only the leader process
+
+            switch (event.type()) {
+                case COMPLETE:
+                    log.info("COMPLETE node {} detected", node.hostname());
+
+                    // adds existing VMs running on the complete state node
+                    deviceService.getPorts(node.intBridge()).stream()
+                            .filter(port -> port.annotations().value(PORT_NAME)
+                                    .startsWith(PORTNAME_PREFIX_VM) &&
+                                    port.isEnabled())
+                            .forEach(port -> {
+                                deviceEventExecutor.execute(() -> processPortAdded(port));
+                                log.info("VM is detected on {}", port);
+                            });
+
+                    // removes stale VMs
+                    hostService.getHosts().forEach(host -> {
+                        if (deviceService.getPort(host.location().deviceId(),
+                                host.location().port()) == null) {
+                            deviceEventExecutor.execute(() -> removeHosts(host.location()));
+                            log.info("Removed stale VM {}", host.location());
+                        }
+                    });
+                    break;
+                case INCOMPLETE:
+                    log.warn("{} is changed to INCOMPLETE state", node);
+                    break;
+                case INIT:
+                case DEVICE_CREATED:
+                default:
+                    break;
+            }
+        }
+    }
 }