Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java b/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
index be91609..d417516 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/MastershipService.java
@@ -56,7 +56,8 @@
Set<DeviceId> getDevicesOf(NodeId nodeId);
/**
- * Returns the mastership term service for getting term information.
+ * Returns the mastership term service for getting read-only
+ * term information.
*
* @return the MastershipTermService for this mastership manager
*/
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java b/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
index be5d873..bedc5e9 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/MastershipStore.java
@@ -64,4 +64,14 @@
* @return the current master's ID and the term value for device, or null
*/
MastershipTerm getTermFor(DeviceId deviceId);
+
+ /**
+ * Revokes a controller instance's mastership over a device and hands
+ * over mastership to another controller instance.
+ *
+ * @param nodeId the controller instance identifier
+ * @param deviceId device to revoke mastership for
+ * @return a mastership event
+ */
+ MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId);
}
diff --git a/core/api/src/test/java/org/onlab/onos/cluster/MastershipTermTest.java b/core/api/src/test/java/org/onlab/onos/cluster/MastershipTermTest.java
new file mode 100644
index 0000000..139c695
--- /dev/null
+++ b/core/api/src/test/java/org/onlab/onos/cluster/MastershipTermTest.java
@@ -0,0 +1,32 @@
+package org.onlab.onos.cluster;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import com.google.common.testing.EqualsTester;
+
+public class MastershipTermTest {
+
+ private static final NodeId N1 = new NodeId("foo");
+ private static final NodeId N2 = new NodeId("bar");
+
+ private static final MastershipTerm TERM1 = MastershipTerm.of(N1, 0);
+ private static final MastershipTerm TERM2 = MastershipTerm.of(N2, 1);
+ private static final MastershipTerm TERM3 = MastershipTerm.of(N2, 1);
+ private static final MastershipTerm TERM4 = MastershipTerm.of(N1, 1);
+
+ @Test
+ public void basics() {
+ assertEquals("incorrect term number", 0, TERM1.termNumber());
+ assertEquals("incorrect master", new NodeId("foo"), TERM1.master());
+ }
+
+ @Test
+ public void testEquality() {
+ new EqualsTester().addEqualityGroup(MastershipTerm.of(N1, 0), TERM1)
+ .addEqualityGroup(TERM2, TERM3)
+ .addEqualityGroup(TERM4);
+ }
+
+}
diff --git a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
index 1a0c408..a0da87c 100644
--- a/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onlab/onos/cluster/impl/MastershipManager.java
@@ -11,6 +11,8 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipAdminService;
import org.onlab.onos.cluster.MastershipEvent;
@@ -52,9 +54,12 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ private ClusterEventListener clusterListener = new InternalClusterEventListener();
+
@Activate
public void activate() {
eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
+ clusterService.addListener(clusterListener);
store.setDelegate(delegate);
log.info("Started");
}
@@ -62,6 +67,7 @@
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(MastershipEvent.class);
+ clusterService.removeListener(clusterListener);
store.unsetDelegate(delegate);
log.info("Stopped");
}
@@ -71,12 +77,16 @@
checkNotNull(nodeId, NODE_ID_NULL);
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(role, ROLE_NULL);
- //TODO figure out appropriate action for non-MASTER roles, if we even set those
+
+ MastershipEvent event = null;
if (role.equals(MastershipRole.MASTER)) {
- MastershipEvent event = store.setMaster(nodeId, deviceId);
- if (event != null) {
- post(event);
- }
+ event = store.setMaster(nodeId, deviceId);
+ } else {
+ event = store.unsetMaster(nodeId, deviceId);
+ }
+
+ if (event != null) {
+ post(event);
}
}
@@ -88,8 +98,16 @@
@Override
public void relinquishMastership(DeviceId deviceId) {
- checkNotNull(deviceId, DEVICE_ID_NULL);
- // FIXME: add method to store to give up mastership and trigger new master selection process
+ MastershipRole role = getLocalRole(deviceId);
+ if (!role.equals(MastershipRole.MASTER)) {
+ return;
+ }
+
+ MastershipEvent event = store.unsetMaster(
+ clusterService.getLocalNode().id(), deviceId);
+ if (event != null) {
+ post(event);
+ }
}
@Override
@@ -146,6 +164,26 @@
}
+ //callback for reacting to cluster events
+ private class InternalClusterEventListener implements ClusterEventListener {
+
+ @Override
+ public void event(ClusterEvent event) {
+ switch (event.type()) {
+ //FIXME: worry about addition when the time comes
+ case INSTANCE_ADDED:
+ case INSTANCE_ACTIVATED:
+ break;
+ case INSTANCE_REMOVED:
+ case INSTANCE_DEACTIVATED:
+ break;
+ default:
+ log.warn("unknown cluster event {}", event);
+ }
+ }
+
+ }
+
public class InternalDelegate implements MastershipStoreDelegate {
@Override
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index e7f2697..f1567fd 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -16,6 +16,7 @@
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
+import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
@@ -76,6 +77,8 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
+ protected MastershipTermService termService;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@@ -84,6 +87,7 @@
store.setDelegate(delegate);
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
mastershipService.addListener(mastershipListener);
+ termService = mastershipService.requestTermService();
log.info("Started");
}
@@ -198,7 +202,7 @@
log.info("Device {} connected", deviceId);
mastershipService.requestRoleFor(deviceId);
provider().roleChanged(event.subject(),
- mastershipService.getLocalRole(deviceId));
+ mastershipService.requestRoleFor(deviceId));
post(event);
}
}
@@ -208,8 +212,11 @@
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
DeviceEvent event = store.markOffline(deviceId);
+
+ //we're no longer capable of mastership.
if (event != null) {
log.info("Device {} disconnected", deviceId);
+ mastershipService.relinquishMastership(deviceId);
post(event);
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
index 770f368..7ee6ddd 100644
--- a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
@@ -65,8 +65,8 @@
private volatile boolean isStarted = false;
private TopologyProviderService providerService;
- private DeviceListener deviceListener = new InnerDeviceListener();
- private LinkListener linkListener = new InnerLinkListener();
+ private DeviceListener deviceListener = new InternalDeviceListener();
+ private LinkListener linkListener = new InternalLinkListener();
private EventAccumulator accumulator;
private ExecutorService executor;
@@ -132,7 +132,7 @@
}
// Callback for device events
- private class InnerDeviceListener implements DeviceListener {
+ private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
@@ -144,7 +144,7 @@
}
// Callback for link events
- private class InnerLinkListener implements LinkListener {
+ private class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
accumulator.add(event);
diff --git a/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java
index d4a13ab..d14df9c 100644
--- a/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/cluster/impl/MastershipManagerTest.java
@@ -19,6 +19,7 @@
import org.onlab.packet.IpPrefix;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.onlab.onos.net.MastershipRole.*;
/**
@@ -65,7 +66,24 @@
@Test
public void relinquishMastership() {
- //TODO
+ //no backups - should turn to standby and no master for device
+ mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
+ assertEquals("wrong role:", MASTER, mgr.getLocalRole(DEV_MASTER));
+ mgr.relinquishMastership(DEV_MASTER);
+ assertNull("wrong master:", mgr.getMasterFor(DEV_OTHER));
+ assertEquals("wrong role:", STANDBY, mgr.getLocalRole(DEV_MASTER));
+
+ //not master, nothing should happen
+ mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY);
+ mgr.relinquishMastership(DEV_OTHER);
+ assertNull("wrong role:", mgr.getMasterFor(DEV_OTHER));
+
+ //provide NID_OTHER as backup and relinquish
+ mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
+ assertEquals("wrong master:", NID_LOCAL, mgr.getMasterFor(DEV_MASTER));
+ mgr.setRole(NID_OTHER, DEV_MASTER, STANDBY);
+ mgr.relinquishMastership(DEV_MASTER);
+ assertEquals("wrong master:", NID_OTHER, mgr.getMasterFor(DEV_MASTER));
}
@Test
@@ -95,7 +113,6 @@
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY);
assertEquals("should be one device:", 1, mgr.getDevicesOf(NID_LOCAL).size());
-
//hand both devices to NID_LOCAL
mgr.setRole(NID_LOCAL, DEV_OTHER, MASTER);
assertEquals("should be two devices:", 2, mgr.getDevicesOf(NID_LOCAL).size());
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
new file mode 100644
index 0000000..398f8f7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
@@ -0,0 +1,47 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_ADVERTISEMENT;
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.Timestamp;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Anti-Entropy advertisement message.
+ *
+ * @param <ID> ID type
+ */
+public class AntiEntropyAdvertisement<ID> extends ClusterMessage {
+
+ private final NodeId sender;
+ private final ImmutableMap<ID, Timestamp> advertisement;
+
+ /**
+ * Creates anti-entropy advertisement message.
+ *
+ * @param sender sender of this message
+ * @param advertisement timestamp information of the data sender holds
+ */
+ public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
+ super(AE_ADVERTISEMENT);
+ this.sender = sender;
+ this.advertisement = ImmutableMap.copyOf(advertisement);
+ }
+
+ public NodeId sender() {
+ return sender;
+ }
+
+ public ImmutableMap<ID, Timestamp> advertisement() {
+ return advertisement;
+ }
+
+ // Default constructor for serializer
+ protected AntiEntropyAdvertisement() {
+ super(AE_ADVERTISEMENT);
+ this.sender = null;
+ this.advertisement = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
new file mode 100644
index 0000000..7a52e09
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
@@ -0,0 +1,55 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_REPLY;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.device.impl.VersionedValue;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class AntiEntropyReply<ID, VALUE> extends ClusterMessage {
+
+ private final NodeId sender;
+ private final ImmutableMap<ID, VersionedValue<VALUE>> suggestion;
+ private final ImmutableSet<ID> request;
+
+ /**
+ * Creates a reply to anti-entropy message.
+ *
+ * @param sender sender of this message
+ * @param suggestion collection of more recent values, sender had
+ * @param request Collection of identifiers
+ */
+ public AntiEntropyReply(NodeId sender,
+ Map<ID, VersionedValue<VALUE>> suggestion,
+ Set<ID> request) {
+ super(AE_REPLY);
+ this.sender = sender;
+ this.suggestion = ImmutableMap.copyOf(suggestion);
+ this.request = ImmutableSet.copyOf(request);
+ }
+
+ public NodeId sender() {
+ return sender;
+ }
+
+ public ImmutableMap<ID, VersionedValue<VALUE>> suggestion() {
+ return suggestion;
+ }
+
+ public ImmutableSet<ID> request() {
+ return request;
+ }
+
+ // Default constructor for serializer
+ protected AntiEntropyReply() {
+ super(AE_REPLY);
+ this.sender = null;
+ this.suggestion = null;
+ this.request = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index c7badf2..97cbf1d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -15,6 +15,12 @@
LEAVING_MEMBER,
/** Signifies a heart-beat message. */
- ECHO
+ ECHO,
+
+ /** Anti-Entropy advertisement message. */
+ AE_ADVERTISEMENT,
+
+ /** Anti-Entropy reply message. */
+ AE_REPLY,
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
index 1a85c53..c18b4da 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/VersionedValue.java
@@ -42,4 +42,12 @@
public Timestamp timestamp() {
return timestamp;
}
+
+
+ // Default constructor for serializer
+ protected VersionedValue() {
+ this.entity = null;
+ this.isUp = false;
+ this.timestamp = null;
+ }
}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index 50c5f08..18e6e96 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -123,6 +123,12 @@
return null;
}
+ @Override
+ public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
new file mode 100644
index 0000000..244cc57
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
@@ -0,0 +1,49 @@
+package org.onlab.onos.store.serializers;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.onlab.util.KryoPool.FamilySerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.MapSerializer;
+import com.google.common.collect.ImmutableMap;
+
+/**
+* Kryo Serializer for {@link ImmutableMap}.
+*/
+public class ImmutableMapSerializer extends FamilySerializer<ImmutableMap<?, ?>> {
+
+ private final MapSerializer mapSerializer = new MapSerializer();
+
+ public ImmutableMapSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableMap<?, ?> object) {
+ // wrapping with unmodifiableMap proxy
+ // to avoid Kryo from writing only the reference marker of this instance,
+ // which will be embedded right before this method call.
+ kryo.writeObject(output, Collections.unmodifiableMap(object), mapSerializer);
+ }
+
+ @Override
+ public ImmutableMap<?, ?> read(Kryo kryo, Input input,
+ Class<ImmutableMap<?, ?>> type) {
+ Map<?, ?> map = kryo.readObject(input, HashMap.class, mapSerializer);
+ return ImmutableMap.copyOf(map);
+ }
+
+ @Override
+ public void registerFamilies(Kryo kryo) {
+ kryo.register(ImmutableMap.of().getClass(), this);
+ kryo.register(ImmutableMap.of(1, 2).getClass(), this);
+ kryo.register(ImmutableMap.of(1, 2, 3, 4).getClass(), this);
+ // TODO register required ImmutableMap variants
+ }
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
new file mode 100644
index 0000000..c08bf9a
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.serializers;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onlab.util.KryoPool.FamilySerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import com.google.common.collect.ImmutableSet;
+
+/**
+* Kryo Serializer for {@link ImmutableSet}.
+*/
+public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
+
+ private final CollectionSerializer serializer = new CollectionSerializer();
+
+ public ImmutableSetSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableSet<?> object) {
+ kryo.writeObject(output, object.asList(), serializer);
+ }
+
+ @Override
+ public ImmutableSet<?> read(Kryo kryo, Input input,
+ Class<ImmutableSet<?>> type) {
+ List<?> elms = kryo.readObject(input, ArrayList.class, serializer);
+ return ImmutableSet.copyOf(elms);
+ }
+
+ @Override
+ public void registerFamilies(Kryo kryo) {
+ kryo.register(ImmutableSet.of().getClass(), this);
+ kryo.register(ImmutableSet.of(1).getClass(), this);
+ kryo.register(ImmutableSet.of(1, 2).getClass(), this);
+ // TODO register required ImmutableSet variants
+ }
+}
diff --git a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
new file mode 100644
index 0000000..8e74072
--- /dev/null
+++ b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
@@ -0,0 +1,128 @@
+package org.onlab.onos.store.serializers;
+
+import static org.onlab.onos.net.DeviceId.deviceId;
+import static org.onlab.onos.net.PortNumber.portNumber;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultLink;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoPool;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.testing.EqualsTester;
+
+import de.javakaffee.kryoserializers.URISerializer;
+
+public class KryoSerializerTests {
+ private static final ProviderId PID = new ProviderId("of", "foo");
+ private static final DeviceId DID1 = deviceId("of:foo");
+ private static final DeviceId DID2 = deviceId("of:bar");
+ private static final PortNumber P1 = portNumber(1);
+ private static final PortNumber P2 = portNumber(2);
+ private static final ConnectPoint CP1 = new ConnectPoint(DID1, P1);
+ private static final ConnectPoint CP2 = new ConnectPoint(DID2, P2);
+ private static final String MFR = "whitebox";
+ private static final String HW = "1.1.x";
+ private static final String SW1 = "3.8.1";
+ private static final String SW2 = "3.9.5";
+ private static final String SN = "43311-12345";
+ private static final Device DEV1 = new DefaultDevice(PID, DID1, Device.Type.SWITCH, MFR, HW, SW1, SN);
+
+ private static KryoPool kryos;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ kryos = KryoPool.newBuilder()
+ .register(
+ ArrayList.class,
+ HashMap.class
+ )
+ .register(
+ Device.Type.class,
+ Link.Type.class
+
+// ControllerNode.State.class,
+// DefaultControllerNode.class,
+// MastershipRole.class,
+// Port.class,
+// Element.class,
+ )
+ .register(ConnectPoint.class, new ConnectPointSerializer())
+ .register(DefaultLink.class, new DefaultLinkSerializer())
+ .register(DefaultPort.class, new DefaultPortSerializer())
+ .register(DeviceId.class, new DeviceIdSerializer())
+ .register(ImmutableMap.class, new ImmutableMapSerializer())
+ .register(ImmutableSet.class, new ImmutableSetSerializer())
+ .register(IpPrefix.class, new IpPrefixSerializer())
+ .register(LinkKey.class, new LinkKeySerializer())
+ .register(NodeId.class, new NodeIdSerializer())
+ .register(PortNumber.class, new PortNumberSerializer())
+ .register(ProviderId.class, new ProviderIdSerializer())
+
+ .register(DefaultDevice.class)
+
+ .register(URI.class, new URISerializer())
+ .build();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // removing Kryo instance to use fresh Kryo on each tests
+ kryos.getKryo();
+ }
+
+ private static <T> void testSerialized(T original) {
+ ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
+ kryos.serialize(original, buffer);
+ buffer.flip();
+ T copy = kryos.deserialize(buffer);
+
+ new EqualsTester()
+ .addEqualityGroup(original, copy)
+ .testEquals();
+ }
+
+
+ @Test
+ public final void test() {
+ testSerialized(new ConnectPoint(DID1, P1));
+ testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT));
+ testSerialized(new DefaultPort(DEV1, P1, true));
+ testSerialized(DID1);
+ testSerialized(ImmutableMap.of(DID1, DEV1, DID2, DEV1));
+ testSerialized(ImmutableMap.of(DID1, DEV1));
+ testSerialized(ImmutableMap.of());
+ testSerialized(ImmutableSet.of(DID1, DID2));
+ testSerialized(ImmutableSet.of(DID1));
+ testSerialized(ImmutableSet.of());
+ testSerialized(IpPrefix.valueOf("192.168.0.1/24"));
+ testSerialized(new LinkKey(CP1, CP2));
+ testSerialized(new NodeId("SomeNodeIdentifier"));
+ testSerialized(P1);
+ testSerialized(PID);
+ }
+
+}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStore.java b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStore.java
index 61dbe61..09853fd 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/net/trivial/impl/SimpleMastershipStore.java
@@ -7,8 +7,6 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
@@ -48,8 +46,10 @@
new DefaultControllerNode(new NodeId("local"), LOCALHOST);
//devices mapped to their masters, to emulate multiple nodes
- protected final ConcurrentMap<DeviceId, NodeId> masterMap =
- new ConcurrentHashMap<>();
+ protected final Map<DeviceId, NodeId> masterMap = new HashMap<>();
+ //emulate backups with pile of nodes
+ protected final Set<NodeId> backups = new HashSet<>();
+ //terms
protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
@Activate
@@ -64,25 +64,29 @@
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+ MastershipRole role = getRole(nodeId, deviceId);
- NodeId node = masterMap.get(deviceId);
- if (node == null) {
- synchronized (this) {
- masterMap.put(deviceId, nodeId);
- termMap.put(deviceId, new AtomicInteger());
- }
- return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
- }
-
- if (node.equals(nodeId)) {
- return null;
- } else {
- synchronized (this) {
- masterMap.put(deviceId, nodeId);
- termMap.get(deviceId).incrementAndGet();
- return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+ synchronized (this) {
+ switch (role) {
+ case MASTER:
+ return null;
+ case STANDBY:
+ masterMap.put(deviceId, nodeId);
+ termMap.get(deviceId).incrementAndGet();
+ backups.add(nodeId);
+ break;
+ case NONE:
+ masterMap.put(deviceId, nodeId);
+ termMap.put(deviceId, new AtomicInteger());
+ backups.add(nodeId);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ return null;
}
}
+
+ return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
}
@Override
@@ -103,34 +107,112 @@
@Override
public MastershipRole requestRole(DeviceId deviceId) {
- return getRole(instance.id(), deviceId);
+ //query+possible reelection
+ NodeId node = instance.id();
+ MastershipRole role = getRole(node, deviceId);
+
+ switch (role) {
+ case MASTER:
+ break;
+ case STANDBY:
+ synchronized (this) {
+ //try to "re-elect", since we're really not distributed
+ NodeId rel = reelect(node);
+ if (rel == null) {
+ masterMap.put(deviceId, node);
+ termMap.put(deviceId, new AtomicInteger());
+ role = MastershipRole.MASTER;
+ }
+ backups.add(node);
+ }
+ break;
+ case NONE:
+ //first to get to it, say we are master
+ synchronized (this) {
+ masterMap.put(deviceId, node);
+ termMap.put(deviceId, new AtomicInteger());
+ backups.add(node);
+ role = MastershipRole.MASTER;
+ }
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return role;
}
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
- NodeId node = masterMap.get(deviceId);
+ //just query
+ NodeId current = masterMap.get(deviceId);
MastershipRole role;
- if (node != null) {
- if (node.equals(nodeId)) {
+
+ if (current == null) {
+ //degenerate case - only node is its own backup
+ if (backups.contains(nodeId)) {
+ role = MastershipRole.STANDBY;
+ } else {
+ role = MastershipRole.NONE;
+ }
+ } else {
+ if (current.equals(nodeId)) {
role = MastershipRole.MASTER;
} else {
role = MastershipRole.STANDBY;
}
- } else {
- //masterMap doesn't contain it.
- role = MastershipRole.MASTER;
- masterMap.put(deviceId, nodeId);
}
return role;
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
- if (masterMap.get(deviceId) == null) {
+ if ((masterMap.get(deviceId) == null) ||
+ (termMap.get(deviceId) == null)) {
return null;
}
return MastershipTerm.of(
masterMap.get(deviceId), termMap.get(deviceId).get());
}
+ @Override
+ public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
+ MastershipRole role = getRole(nodeId, deviceId);
+ synchronized (this) {
+ switch (role) {
+ case MASTER:
+ NodeId backup = reelect(nodeId);
+ if (backup == null) {
+ masterMap.remove(deviceId);
+ } else {
+ masterMap.put(deviceId, backup);
+ termMap.get(deviceId).incrementAndGet();
+ return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
+ }
+ case STANDBY:
+ case NONE:
+ if (!termMap.containsKey(deviceId)) {
+ termMap.put(deviceId, new AtomicInteger());
+ }
+ backups.add(nodeId);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ }
+ return null;
+ }
+
+ //dumbly selects next-available node that's not the current one
+ //emulate leader election
+ private NodeId reelect(NodeId nodeId) {
+ NodeId backup = null;
+ for (NodeId n : backups) {
+ if (!n.equals(nodeId)) {
+ backup = n;
+ break;
+ }
+ }
+ return backup;
+ }
+
}
diff --git a/tools/build/onos-package b/tools/build/onos-package
index cf751c7..2d6b954 100755
--- a/tools/build/onos-package
+++ b/tools/build/onos-package
@@ -34,6 +34,8 @@
mkdir -p $KARAF_DIST/system/org/onlab
cp -r $M2_REPO/org/onlab $KARAF_DIST/system/org/
+export ONOS_FEATURES="${ONOS_FEATURES:-webconsole,onos-api,onos-core,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd,onos-app-foo}"
+
# Cellar Patching --------------------------------------------------------------
# Patch the Apache Karaf distribution file to add Cellar features repository
@@ -51,7 +53,7 @@
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution file to load ONOS features
-perl -pi.old -e 's|^(featuresBoot=.*)|\1,webconsole,onos-api,onos-core,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd,onos-app-foo|' \
+perl -pi.old -e "s|^(featuresBoot=.*)|\1,$ONOS_FEATURES|" \
$ONOS_STAGE/$KARAF_DIST/etc/org.apache.karaf.features.cfg
# Patch the Apache Karaf distribution with ONOS branding bundle
diff --git a/tools/dev/bash_profile b/tools/dev/bash_profile
index 8bd4300..79c6bfb 100644
--- a/tools/dev/bash_profile
+++ b/tools/dev/bash_profile
@@ -66,6 +66,7 @@
env | egrep "OCI"
env | egrep "OC[0-9]+" | sort
env | egrep "OCN"
+ env | egrep "ONOS_" | egrep -v 'ONOS_ROOT|ONOS_CELL'
fi
}
diff --git a/tools/test/cells/.reset b/tools/test/cells/.reset
index c025225..5b1bc2c 100644
--- a/tools/test/cells/.reset
+++ b/tools/test/cells/.reset
@@ -1 +1 @@
-unset OC1 OC2 OC3 OC4 OC5 OC6 OC7 OC8 OC9 OCN ONOS_NIC
+unset OC1 OC2 OC3 OC4 OC5 OC6 OC7 OC8 OC9 OCN ONOS_NIC ONOS_FEATURES
diff --git a/tools/test/cells/office b/tools/test/cells/office
index 0edf04d..6053ec8 100644
--- a/tools/test/cells/office
+++ b/tools/test/cells/office
@@ -1,10 +1,9 @@
# ProxMox-based cell of ONOS instances 1,2 & ONOS mininet box
. $ONOS_ROOT/tools/test/cells/.reset
+export ONOS_FEATURES="webconsole,onos-api,onos-core-trivial,onos-cli,onos-openflow,onos-app-fwd,onos-app-mobility,onos-app-tvue"
+
export ONOS_NIC="10.128.4.*"
export OC1="10.128.4.60"
-#export OC2="192.168.97.131"
-
-#export OCN="192.168.97.130"
diff --git a/tools/test/cells/three b/tools/test/cells/three
new file mode 100644
index 0000000..38c934b
--- /dev/null
+++ b/tools/test/cells/three
@@ -0,0 +1,12 @@
+# Default virtual box ONOS instances 1,2 & ONOS mininet box
+
+export ONOS_NIC=192.168.56.*
+
+export ONOS_FEATURES="webconsole,onos-api,onos-core-trivial,onos-cli,onos-rest,onos-gui,onos-openflow,onos-app-fwd,onos-app-foo"
+
+export OC1="192.168.56.101"
+export OC2="192.168.56.102"
+export OC3="192.168.56.104"
+
+export OCN="192.168.56.103"
+
diff --git a/utils/misc/src/main/java/org/onlab/util/KryoPool.java b/utils/misc/src/main/java/org/onlab/util/KryoPool.java
index be662a6..3fae0c5 100644
--- a/utils/misc/src/main/java/org/onlab/util/KryoPool.java
+++ b/utils/misc/src/main/java/org/onlab/util/KryoPool.java
@@ -239,12 +239,41 @@
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(registrationRequired);
for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
- if (registry.getRight() == null) {
+ final Serializer<?> serializer = registry.getRight();
+ if (serializer == null) {
kryo.register(registry.getLeft());
} else {
- kryo.register(registry.getLeft(), registry.getRight());
+ kryo.register(registry.getLeft(), serializer);
+ if (serializer instanceof FamilySerializer) {
+ FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
+ fser.registerFamilies(kryo);
+ }
}
}
return kryo;
}
+
+ /**
+ * Serializer implementation, which required registration of family of Classes.
+ * @param <T> base type of this serializer.
+ */
+ public abstract static class FamilySerializer<T> extends Serializer<T> {
+
+
+ public FamilySerializer(boolean acceptsNull) {
+ super(acceptsNull);
+ }
+
+ public FamilySerializer(boolean acceptsNull, boolean immutable) {
+ super(acceptsNull, immutable);
+ }
+
+ /**
+ * Registers other classes this Serializer supports.
+ *
+ * @param kryo instance to register classes to
+ */
+ public void registerFamilies(Kryo kryo) {
+ }
+ }
}