Added graceful shutdown for upstart service.
Reworked slightly the mastership & device managers and stores to make it work (sort-of) in the distributed env.
diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/ClusterManager.java
index 43d743c..aa21283 100644
--- a/core/net/src/main/java/org/onlab/onos/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onlab/onos/cluster/impl/ClusterManager.java
@@ -6,6 +6,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterAdminService;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
@@ -26,7 +27,7 @@
*/
@Component(immediate = true)
@Service
-public class ClusterManager implements ClusterService {
+public class ClusterManager implements ClusterService, ClusterAdminService {
public static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
private final Logger log = getLogger(getClass());
@@ -75,6 +76,12 @@
}
@Override
+ public void removeNode(NodeId nodeId) {
+ checkNotNull(nodeId, INSTANCE_ID_NULL);
+ store.removeNode(nodeId);
+ }
+
+ @Override
public void addListener(ClusterEventListener listener) {
listenerRegistry.addListener(listener);
}
diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
index 8c3fd50..4ac6052 100644
--- a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
@@ -1,9 +1,5 @@
package org.onlab.onos.cluster.impl;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Set;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -14,8 +10,6 @@
import org.onlab.onos.cluster.MastershipAdminService;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
-import org.onlab.onos.cluster.MastershipProvider;
-import org.onlab.onos.cluster.MastershipProviderService;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipStore;
import org.onlab.onos.cluster.NodeId;
@@ -23,16 +17,16 @@
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
-import org.onlab.onos.net.provider.AbstractProviderRegistry;
-import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
+import java.util.Set;
+
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class MastershipManager
- extends AbstractProviderRegistry<MastershipProvider, MastershipProviderService>
implements MastershipService, MastershipAdminService {
private static final String NODE_ID_NULL = "Node ID cannot be null";
@@ -77,6 +71,24 @@
}
@Override
+ public MastershipRole getLocalRole(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.getRole(clusterService.getLocalNode().id(), deviceId);
+ }
+
+ @Override
+ public void relinquishMastership(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ // FIXME: add method to store to give up mastership and trigger new master selection process
+ }
+
+ @Override
+ public MastershipRole requestRoleFor(DeviceId deviceId) {
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ return store.requestRole(deviceId);
+ }
+
+ @Override
public NodeId getMasterFor(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
return store.getMaster(deviceId);
@@ -89,13 +101,6 @@
}
@Override
- public MastershipRole requestRoleFor(DeviceId deviceId) {
- checkNotNull(deviceId, DEVICE_ID_NULL);
- NodeId id = clusterService.getLocalNode().id();
- return store.getRole(id, deviceId);
- }
-
- @Override
public void addListener(MastershipListener listener) {
checkNotNull(listener);
listenerRegistry.addListener(listener);
@@ -107,28 +112,7 @@
listenerRegistry.removeListener(listener);
}
- @Override
- protected MastershipProviderService createProviderService(
- MastershipProvider provider) {
- return new InternalMastershipProviderService(provider);
- }
-
- private class InternalMastershipProviderService
- extends AbstractProviderService<MastershipProvider>
- implements MastershipProviderService {
-
- protected InternalMastershipProviderService(MastershipProvider provider) {
- super(provider);
- }
-
- @Override
- public void roleChanged(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
- // TODO Auto-generated method stub
- MastershipEvent event =
- store.addOrUpdateDevice(nodeId, deviceId, role);
- post(event);
- }
- }
+ // FIXME: provide wiring to allow events to be triggered by changes within the store
// Posts the specified event to the local event dispatcher.
private void post(MastershipEvent event) {
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index 179e214..2f61925 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -6,6 +6,9 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.MastershipEvent;
+import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
@@ -54,6 +57,8 @@
protected final AbstractListenerRegistry<DeviceEvent, DeviceListener>
listenerRegistry = new AbstractListenerRegistry<>();
+ private final MastershipListener mastershipListener = new InnerMastershipListener();
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceStore store;
@@ -61,16 +66,21 @@
protected EventDeliveryService eventDispatcher;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Activate
public void activate() {
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
+ mastershipService.addListener(mastershipListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ mastershipService.removeListener(mastershipListener);
eventDispatcher.removeSink(DeviceEvent.class);
log.info("Stopped");
}
@@ -94,7 +104,7 @@
@Override
public MastershipRole getRole(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
- return store.getRole(deviceId);
+ return mastershipService.getLocalRole(deviceId);
}
@Override
@@ -116,18 +126,15 @@
return store.isAvailable(deviceId);
}
- @Override
- public void setRole(DeviceId deviceId, MastershipRole newRole) {
- checkNotNull(deviceId, DEVICE_ID_NULL);
- checkNotNull(newRole, ROLE_NULL);
- DeviceEvent event = store.setRole(deviceId, newRole);
- if (event != null) {
- Device device = event.subject();
+ // Applies the specified role to the device; ignores NONE
+ private void applyRole(DeviceId deviceId, MastershipRole newRole) {
+ if (newRole != MastershipRole.NONE) {
+ Device device = store.getDevice(deviceId);
DeviceProvider provider = getProvider(device.providerId());
if (provider != null) {
provider.roleChanged(device, newRole);
}
- post(event);
+ post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device));
}
}
@@ -176,12 +183,9 @@
// If there was a change of any kind, trigger role selection process.
if (event != null) {
log.info("Device {} connected", deviceId);
- if (event.type().equals(DEVICE_ADDED)) {
- MastershipRole role = mastershipService.requestRoleFor(deviceId);
- store.setRole(deviceId, role);
- }
- Device device = event.subject();
- provider().roleChanged(device, store.getRole(device.id()));
+ mastershipService.requestRoleFor(deviceId);
+ provider().roleChanged(event.subject(),
+ mastershipService.getLocalRole(deviceId));
post(event);
}
}
@@ -229,4 +233,14 @@
}
}
+ // Intercepts mastership events
+ private class InnerMastershipListener implements MastershipListener {
+ @Override
+ public void event(MastershipEvent event) {
+ // FIXME: for now we're taking action only on becoming master
+ if (event.master().equals(clusterService.getLocalNode().id())) {
+ applyRole(event.subject(), MastershipRole.MASTER);
+ }
+ }
+ }
}