Support existing VMs running on the newly added node
- Added openstack node state event
- Made openstack switching to listen to the node state events and
add existing VMs running in the complete state node
Change-Id: I7b7186c3b889376a4bc0385313433604dcd93d70
diff --git a/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java b/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java
index 24ab533..5388e9b 100644
--- a/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java
+++ b/apps/openstacknetworking/switching/src/main/java/org/onosproject/openstacknetworking/switching/OpenstackSwitchingManager.java
@@ -56,6 +56,10 @@
import org.onosproject.openstackinterface.OpenstackSubnet;
import org.onosproject.openstacknetworking.OpenstackPortInfo;
import org.onosproject.openstacknetworking.OpenstackSwitchingService;
+import org.onosproject.openstacknode.OpenstackNode;
+import org.onosproject.openstacknode.OpenstackNodeEvent;
+import org.onosproject.openstacknode.OpenstackNodeListener;
+import org.onosproject.openstacknode.OpenstackNodeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,11 +107,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackInterfaceService openstackService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OpenstackNodeService openstackNodeService;
+
private final ExecutorService deviceEventExecutor =
Executors.newSingleThreadExecutor(groupedThreads("onos/openstackswitching", "device-event"));
private final ExecutorService configEventExecutor =
Executors.newSingleThreadExecutor(groupedThreads("onos/openstackswitching", "config-event"));
private final InternalDeviceListener internalDeviceListener = new InternalDeviceListener();
+ private final InternalOpenstackNodeListener internalNodeListener = new InternalOpenstackNodeListener();
private HostProviderService hostProvider;
@@ -122,6 +130,7 @@
protected void activate() {
coreService.registerApplication(APP_ID);
deviceService.addListener(internalDeviceListener);
+ openstackNodeService.addListener(internalNodeListener);
hostProvider = hostProviderRegistry.register(this);
log.info("Started");
@@ -131,6 +140,7 @@
protected void deactivate() {
hostProviderRegistry.unregister(this);
deviceService.removeListener(internalDeviceListener);
+ openstackNodeService.removeListener(internalNodeListener);
deviceEventExecutor.shutdown();
configEventExecutor.shutdown();
@@ -179,6 +189,7 @@
}
private void processPortAdded(Port port) {
+ // TODO check the node state is COMPLETE
OpenstackPort osPort = openstackService.port(port);
if (osPort == null) {
log.warn("Failed to get OpenStack port for {}", port);
@@ -221,6 +232,10 @@
private void processPortRemoved(Port port) {
ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
+ removeHosts(connectPoint);
+ }
+
+ private void removeHosts(ConnectPoint connectPoint) {
hostService.getConnectedHosts(connectPoint).stream()
.forEach(host -> {
dhcpService.removeStaticMapping(host.mac());
@@ -300,4 +315,45 @@
}
}
}
+
+ private class InternalOpenstackNodeListener implements OpenstackNodeListener {
+
+ @Override
+ public void event(OpenstackNodeEvent event) {
+ OpenstackNode node = event.node();
+ // TODO check leadership of the node and make only the leader process
+
+ switch (event.type()) {
+ case COMPLETE:
+ log.info("COMPLETE node {} detected", node.hostname());
+
+ // adds existing VMs running on the complete state node
+ deviceService.getPorts(node.intBridge()).stream()
+ .filter(port -> port.annotations().value(PORT_NAME)
+ .startsWith(PORTNAME_PREFIX_VM) &&
+ port.isEnabled())
+ .forEach(port -> {
+ deviceEventExecutor.execute(() -> processPortAdded(port));
+ log.info("VM is detected on {}", port);
+ });
+
+ // removes stale VMs
+ hostService.getHosts().forEach(host -> {
+ if (deviceService.getPort(host.location().deviceId(),
+ host.location().port()) == null) {
+ deviceEventExecutor.execute(() -> removeHosts(host.location()));
+ log.info("Removed stale VM {}", host.location());
+ }
+ });
+ break;
+ case INCOMPLETE:
+ log.warn("{} is changed to INCOMPLETE state", node);
+ break;
+ case INIT:
+ case DEVICE_CREATED:
+ default:
+ break;
+ }
+ }
+ }
}