Added graceful shutdown for upstart service.
Reworked slightly the mastership & device managers and stores to make it work (sort-of) in the distributed env.
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index 179e214..2f61925 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -6,6 +6,9 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.MastershipEvent;
+import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
@@ -54,6 +57,8 @@
protected final AbstractListenerRegistry<DeviceEvent, DeviceListener>
listenerRegistry = new AbstractListenerRegistry<>();
+ private final MastershipListener mastershipListener = new InnerMastershipListener();
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceStore store;
@@ -61,16 +66,21 @@
protected EventDeliveryService eventDispatcher;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Activate
public void activate() {
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
+ mastershipService.addListener(mastershipListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ mastershipService.removeListener(mastershipListener);
eventDispatcher.removeSink(DeviceEvent.class);
log.info("Stopped");
}
@@ -94,7 +104,7 @@
@Override
public MastershipRole getRole(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
- return store.getRole(deviceId);
+ return mastershipService.getLocalRole(deviceId);
}
@Override
@@ -116,18 +126,15 @@
return store.isAvailable(deviceId);
}
- @Override
- public void setRole(DeviceId deviceId, MastershipRole newRole) {
- checkNotNull(deviceId, DEVICE_ID_NULL);
- checkNotNull(newRole, ROLE_NULL);
- DeviceEvent event = store.setRole(deviceId, newRole);
- if (event != null) {
- Device device = event.subject();
+ // Applies the specified role to the device; ignores NONE
+ private void applyRole(DeviceId deviceId, MastershipRole newRole) {
+ if (newRole != MastershipRole.NONE) {
+ Device device = store.getDevice(deviceId);
DeviceProvider provider = getProvider(device.providerId());
if (provider != null) {
provider.roleChanged(device, newRole);
}
- post(event);
+ post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device));
}
}
@@ -176,12 +183,9 @@
// If there was a change of any kind, trigger role selection process.
if (event != null) {
log.info("Device {} connected", deviceId);
- if (event.type().equals(DEVICE_ADDED)) {
- MastershipRole role = mastershipService.requestRoleFor(deviceId);
- store.setRole(deviceId, role);
- }
- Device device = event.subject();
- provider().roleChanged(device, store.getRole(device.id()));
+ mastershipService.requestRoleFor(deviceId);
+ provider().roleChanged(event.subject(),
+ mastershipService.getLocalRole(deviceId));
post(event);
}
}
@@ -229,4 +233,14 @@
}
}
+ // Intercepts mastership events
+ private class InnerMastershipListener implements MastershipListener {
+ @Override
+ public void event(MastershipEvent event) {
+ // FIXME: for now we're taking action only on becoming master
+ if (event.master().equals(clusterService.getLocalNode().id())) {
+ applyRole(event.subject(), MastershipRole.MASTER);
+ }
+ }
+ }
}