Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
Conflicts:
core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
Change-Id: I6a8b756fc20968e18ea3fd145e155d6282cea945
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java b/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
index be91609..d417516 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
@@ -56,7 +56,8 @@
Set<DeviceId> getDevicesOf(NodeId nodeId);
/**
- * Returns the mastership term service for getting term information.
+ * Returns the mastership term service for getting read-only
+ * term information.
*
* @return the MastershipTermService for this mastership manager
*/
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java b/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
index be5d873..bedc5e9 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
@@ -64,4 +64,14 @@
* @return the current master's ID and the term value for device, or null
*/
MastershipTerm getTermFor(DeviceId deviceId);
+
+ /**
+ * Revokes a controller instance's mastership over a device and hands
+ * over mastership to another controller instance.
+ *
+ * @param nodeId the controller instance identifier
+ * @param deviceId device to revoke mastership for
+ * @return a mastership event
+ */
+ MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId);
}
diff --git a/core/api/src/test/java/org/onlab/onos/cluster/MastershipTermTest.java b/core/api/src/test/java/org/onlab/onos/cluster/MastershipTermTest.java
new file mode 100644
index 0000000..139c695
--- /dev/null
+++ b/core/api/src/test/java/org/onlab/onos/cluster/MastershipTermTest.java
@@ -0,0 +1,32 @@
+package org.onlab.onos.cluster;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import com.google.common.testing.EqualsTester;
+
+public class MastershipTermTest {
+
+ private static final NodeId N1 = new NodeId("foo");
+ private static final NodeId N2 = new NodeId("bar");
+
+ private static final MastershipTerm TERM1 = MastershipTerm.of(N1, 0);
+ private static final MastershipTerm TERM2 = MastershipTerm.of(N2, 1);
+ private static final MastershipTerm TERM3 = MastershipTerm.of(N2, 1);
+ private static final MastershipTerm TERM4 = MastershipTerm.of(N1, 1);
+
+ @Test
+ public void basics() {
+ assertEquals("incorrect term number", 0, TERM1.termNumber());
+ assertEquals("incorrect master", new NodeId("foo"), TERM1.master());
+ }
+
+ @Test
+ public void testEquality() {
+ new EqualsTester().addEqualityGroup(MastershipTerm.of(N1, 0), TERM1)
+ .addEqualityGroup(TERM2, TERM3)
+ .addEqualityGroup(TERM4);
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
index 1a0c408..20ebc40 100644
--- a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
@@ -11,6 +11,8 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipAdminService;
import org.onlab.onos.cluster.MastershipEvent;
@@ -52,9 +54,12 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ private ClusterEventListener clusterListener = new InternalClusterEventListener();
+
@Activate
public void activate() {
eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
+ clusterService.addListener(clusterListener);
store.setDelegate(delegate);
log.info("Started");
}
@@ -62,6 +67,7 @@
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(MastershipEvent.class);
+ clusterService.removeListener(clusterListener);
store.unsetDelegate(delegate);
log.info("Stopped");
}
@@ -71,9 +77,20 @@
checkNotNull(nodeId, NODE_ID_NULL);
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(role, ROLE_NULL);
- //TODO figure out appropriate action for non-MASTER roles, if we even set those
- if (role.equals(MastershipRole.MASTER)) {
- MastershipEvent event = store.setMaster(nodeId, deviceId);
+
+ MastershipRole current = store.getRole(nodeId, deviceId);
+ if (role.equals(current)) {
+ return;
+ } else {
+ MastershipEvent event = null;
+ if (role.equals(MastershipRole.MASTER)) {
+ //current was STANDBY, wanted MASTER
+ event = store.setMaster(nodeId, deviceId);
+ } else {
+ //current was MASTER, wanted STANDBY
+ event = store.unsetMaster(nodeId, deviceId);
+ }
+
if (event != null) {
post(event);
}
@@ -89,7 +106,18 @@
@Override
public void relinquishMastership(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
- // FIXME: add method to store to give up mastership and trigger new master selection process
+
+ MastershipRole role = store.getRole(
+ clusterService.getLocalNode().id(), deviceId);
+ if (!role.equals(MastershipRole.MASTER)) {
+ return;
+ }
+
+ MastershipEvent event = store.unsetMaster(
+ clusterService.getLocalNode().id(), deviceId);
+ if (event != null) {
+ post(event);
+ }
}
@Override
@@ -146,6 +174,26 @@
}
+ //callback for reacting to cluster events
+ private class InternalClusterEventListener implements ClusterEventListener {
+
+ @Override
+ public void event(ClusterEvent event) {
+ switch (event.type()) {
+ //FIXME: worry about addition when the time comes
+ case INSTANCE_ADDED:
+ case INSTANCE_ACTIVATED:
+ break;
+ case INSTANCE_REMOVED:
+ case INSTANCE_DEACTIVATED:
+ break;
+ default:
+ log.warn("unknown cluster event {}", event);
+ }
+ }
+
+ }
+
public class InternalDelegate implements MastershipStoreDelegate {
@Override
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index e7f2697..eb409a5 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -16,6 +16,7 @@
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
+import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
@@ -76,6 +77,8 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
+ protected MastershipTermService termService;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@@ -84,6 +87,7 @@
store.setDelegate(delegate);
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
mastershipService.addListener(mastershipListener);
+ termService = mastershipService.requestTermService();
log.info("Started");
}
@@ -208,8 +212,11 @@
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
DeviceEvent event = store.markOffline(deviceId);
+
+ //we're no longer capable of mastership.
if (event != null) {
log.info("Device {} disconnected", deviceId);
+ mastershipService.relinquishMastership(deviceId);
post(event);
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
index 770f368..7ee6ddd 100644
--- a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
@@ -65,8 +65,8 @@
private volatile boolean isStarted = false;
private TopologyProviderService providerService;
- private DeviceListener deviceListener = new InnerDeviceListener();
- private LinkListener linkListener = new InnerLinkListener();
+ private DeviceListener deviceListener = new InternalDeviceListener();
+ private LinkListener linkListener = new InternalLinkListener();
private EventAccumulator accumulator;
private ExecutorService executor;
@@ -132,7 +132,7 @@
}
// Callback for device events
- private class InnerDeviceListener implements DeviceListener {
+ private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
@@ -144,7 +144,7 @@
}
// Callback for link events
- private class InnerLinkListener implements LinkListener {
+ private class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
accumulator.add(event);
diff --git a/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java
index d4a13ab..fd67681 100644
--- a/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java
@@ -19,6 +19,7 @@
import org.onlab.packet.IpPrefix;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.onlab.onos.net.MastershipRole.*;
/**
@@ -65,7 +66,24 @@
@Test
public void relinquishMastership() {
- //TODO
+ //no backups - should turn to standby and no master for device
+ mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
+ assertEquals("wrong role:", MASTER, mgr.getLocalRole(DEV_MASTER));
+ mgr.relinquishMastership(DEV_MASTER);
+ assertNull("wrong master:", mgr.getMasterFor(DEV_OTHER));
+ assertEquals("wrong role:", STANDBY, mgr.getLocalRole(DEV_MASTER));
+
+ //not master, nothing should happen
+ mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY);
+ mgr.relinquishMastership(DEV_OTHER);
+ assertNull("wrong role:", mgr.getMasterFor(DEV_OTHER));
+
+ //provide NID_OTHER as backup and relinquish
+ mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
+ assertEquals("wrong master:", NID_LOCAL, mgr.getMasterFor(DEV_MASTER));
+ mgr.setRole(NID_OTHER, DEV_MASTER, STANDBY);
+ mgr.relinquishMastership(DEV_MASTER);
+ assertEquals("wrong master:", NID_OTHER, mgr.getMasterFor(DEV_MASTER));
}
@Test
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index 4f6103c..ccb2937 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -123,6 +123,12 @@
return null;
}
+ @Override
+ public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStore.java b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStore.java
index 61dbe61..6690707 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStore.java
@@ -5,10 +5,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
@@ -28,6 +27,8 @@
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
+import com.google.common.collect.Lists;
+
import static org.onlab.onos.cluster.MastershipEvent.Type.*;
/**
@@ -48,8 +49,10 @@
new DefaultControllerNode(new NodeId("local"), LOCALHOST);
//devices mapped to their masters, to emulate multiple nodes
- protected final ConcurrentMap<DeviceId, NodeId> masterMap =
- new ConcurrentHashMap<>();
+ protected final Map<DeviceId, NodeId> masterMap = new HashMap<>();
+ //emulate backups
+ protected final Map<DeviceId, List<NodeId>> backupMap = new HashMap<>();
+ //terms
protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
@Activate
@@ -65,24 +68,38 @@
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
- NodeId node = masterMap.get(deviceId);
- if (node == null) {
- synchronized (this) {
- masterMap.put(deviceId, nodeId);
- termMap.put(deviceId, new AtomicInteger());
- }
- return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
- }
+ NodeId current = masterMap.get(deviceId);
+ List<NodeId> backups = backupMap.get(deviceId);
- if (node.equals(nodeId)) {
+ if (current == null) {
+ if (backups == null) {
+ //add new mapping to everything
+ synchronized (this) {
+ masterMap.put(deviceId, nodeId);
+ backups = Lists.newLinkedList();
+ backupMap.put(deviceId, backups);
+ termMap.put(deviceId, new AtomicInteger());
+ }
+ } else {
+ //set master to new node and remove from backups if there
+ synchronized (this) {
+ masterMap.put(deviceId, nodeId);
+ backups.remove(nodeId);
+ termMap.get(deviceId).incrementAndGet();
+ }
+ }
+ } else if (current.equals(nodeId)) {
return null;
} else {
- synchronized (this) {
- masterMap.put(deviceId, nodeId);
- termMap.get(deviceId).incrementAndGet();
- return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
- }
+ //add current to backup, set master to new node
+ masterMap.put(deviceId, nodeId);
+ backups.add(current);
+ backups.remove(nodeId);
+ termMap.get(deviceId).incrementAndGet();
}
+
+ updateStandby(nodeId, deviceId);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
}
@Override
@@ -108,29 +125,97 @@
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
- NodeId node = masterMap.get(deviceId);
- MastershipRole role;
- if (node != null) {
- if (node.equals(nodeId)) {
- role = MastershipRole.MASTER;
- } else {
- role = MastershipRole.STANDBY;
+ NodeId current = masterMap.get(deviceId);
+ List<NodeId> backups = backupMap.get(deviceId);
+
+ if (current == null) {
+ //masterMap or backup doesn't contain device. Say new node is MASTER
+ if (backups == null) {
+ synchronized (this) {
+ masterMap.put(deviceId, nodeId);
+ backups = Lists.newLinkedList();
+ backupMap.put(deviceId, backups);
+ termMap.put(deviceId, new AtomicInteger());
+ }
+ updateStandby(nodeId, deviceId);
+ return MastershipRole.MASTER;
}
+
+ //device once existed, but got removed, and is now getting a backup.
+ if (!backups.contains(nodeId)) {
+ synchronized (this) {
+ backups.add(nodeId);
+ termMap.put(deviceId, new AtomicInteger());
+ }
+ updateStandby(nodeId, deviceId);
+ }
+
+ } else if (current.equals(nodeId)) {
+ return MastershipRole.MASTER;
} else {
- //masterMap doesn't contain it.
- role = MastershipRole.MASTER;
- masterMap.put(deviceId, nodeId);
+ //once created, a device never has a null backups list.
+ if (!backups.contains(nodeId)) {
+ //we must have requested STANDBY setting
+ synchronized (this) {
+ backups.add(nodeId);
+ termMap.put(deviceId, new AtomicInteger());
+ }
+ updateStandby(nodeId, deviceId);
+ }
}
- return role;
+
+ return MastershipRole.STANDBY;
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
- if (masterMap.get(deviceId) == null) {
+ if ((masterMap.get(deviceId) == null) ||
+ (termMap.get(deviceId) == null)) {
return null;
}
return MastershipTerm.of(
masterMap.get(deviceId), termMap.get(deviceId).get());
}
+ @Override
+ public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
+ NodeId node = masterMap.get(deviceId);
+
+ //TODO case where node is completely removed from the cluster?
+ if (node.equals(nodeId)) {
+ synchronized (this) {
+ //pick new node.
+ List<NodeId> backups = backupMap.get(deviceId);
+
+ //no backups, so device is hosed
+ if (backups.isEmpty()) {
+ masterMap.remove(deviceId);
+ backups.add(nodeId);
+ return null;
+ }
+ NodeId backup = backups.remove(0);
+ masterMap.put(deviceId, backup);
+ backups.add(nodeId);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
+ }
+ }
+ return null;
+ }
+
+ //add node as STANDBY to maps un-scalably.
+ private void updateStandby(NodeId nodeId, DeviceId deviceId) {
+ for (Map.Entry<DeviceId, List<NodeId>> e : backupMap.entrySet()) {
+ DeviceId dev = e.getKey();
+ if (dev.equals(deviceId)) {
+ continue;
+ }
+ synchronized (this) {
+ List<NodeId> nodes = e.getValue();
+ if (!nodes.contains(nodeId)) {
+ nodes.add(nodeId);
+ }
+ }
+ }
+ }
+
}