Added graceful shutdown for upstart service.
Reworked slightly the mastership & device managers and stores to make it work (sort-of) in the distributed env.
diff --git a/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index ee09570..77a28f5 100644
--- a/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -1,52 +1,74 @@
package org.onlab.onos.store.cluster.impl;
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.StoreService;
+import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
+import org.onlab.onos.store.impl.AbstractDistributedStore;
+import org.onlab.onos.store.impl.OptionalCacheLoader;
import org.onlab.packet.IpPrefix;
-import org.slf4j.Logger;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
-import static org.slf4j.LoggerFactory.getLogger;
+import static com.google.common.cache.CacheBuilder.newBuilder;
+import static org.onlab.onos.cluster.ControllerNode.State;
/**
* Distributed implementation of the cluster nodes store.
*/
@Component(immediate = true)
@Service
-public class DistributedClusterStore implements ClusterStore {
+public class DistributedClusterStore extends AbstractDistributedStore
+ implements ClusterStore {
- private final Logger log = getLogger(getClass());
+ private IMap<byte[], byte[]> rawNodes;
+ private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StoreService storeService;
-
- private HazelcastInstance theInstance;
-
- // FIXME: experimental implementation; enhance to assure persistence and
- // visibility to nodes that are not currently in the cluster
+ private String listenerId;
+ private final MembershipListener listener = new InnerMembershipListener();
+ private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Activate
public void activate() {
- log.info("Started");
- theInstance = storeService.getHazelcastInstance();
+ super.activate();
+ listenerId = theInstance.getCluster().addMembershipListener(listener);
+ rawNodes = theInstance.getMap("nodes");
+ OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
+ = new OptionalCacheLoader<>(storeService, rawNodes);
+ nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+ rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true);
+
+ loadClusterNodes();
+
+ log.info("Started");
+ }
+
+ // Loads the initial set of cluster nodes
+ private void loadClusterNodes() {
+ for (Member member : theInstance.getCluster().getMembers()) {
+ addMember(member);
+ }
}
@Deactivate
public void deactivate() {
+ theInstance.getCluster().removeMembershipListener(listenerId);
log.info("Stopped");
}
@@ -58,30 +80,71 @@
@Override
public Set<ControllerNode> getNodes() {
ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
- for (Member member : theInstance.getCluster().getMembers()) {
- builder.add(node(member));
+ for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
+ builder.add(optional.get());
}
return builder.build();
}
@Override
public ControllerNode getNode(NodeId nodeId) {
- for (Member member : theInstance.getCluster().getMembers()) {
- if (member.getUuid().equals(nodeId.toString())) {
- return node(member);
- }
- }
- return null;
+ return nodes.getUnchecked(nodeId).orNull();
}
@Override
- public ControllerNode.State getState(NodeId nodeId) {
- return ControllerNode.State.ACTIVE;
+ public State getState(NodeId nodeId) {
+ State state = states.get(nodeId);
+ return state == null ? State.INACTIVE : state;
+ }
+
+ @Override
+ public void removeNode(NodeId nodeId) {
+ synchronized (this) {
+ rawNodes.remove(serialize(nodeId));
+ nodes.invalidate(nodeId);
+ }
+ }
+
+ // Adds a new node based on the specified member
+ private synchronized void addMember(Member member) {
+ DefaultControllerNode node = node(member);
+ rawNodes.put(serialize(node.id()), serialize(node));
+ nodes.put(node.id(), Optional.of(node));
+ states.put(node.id(), State.ACTIVE);
}
// Creates a controller node descriptor from the Hazelcast member.
- private ControllerNode node(Member member) {
- return new DefaultControllerNode(new NodeId(member.getUuid()),
- IpPrefix.valueOf(member.getSocketAddress().getAddress().getAddress()));
+ private DefaultControllerNode node(Member member) {
+ IpPrefix ip = memberAddress(member);
+ return new DefaultControllerNode(new NodeId(ip.toString()), ip);
+ }
+
+ private IpPrefix memberAddress(Member member) {
+ byte[] address = member.getSocketAddress().getAddress().getAddress();
+ return IpPrefix.valueOf(address);
+ }
+
+ // Interceptor for membership events.
+ private class InnerMembershipListener implements MembershipListener {
+ @Override
+ public void memberAdded(MembershipEvent membershipEvent) {
+ log.info("Member {} added", membershipEvent.getMember());
+ addMember(membershipEvent.getMember());
+ }
+
+ @Override
+ public void memberRemoved(MembershipEvent membershipEvent) {
+ log.info("Member {} removed", membershipEvent.getMember());
+ states.put(new NodeId(memberAddress(membershipEvent.getMember()).toString()),
+ State.INACTIVE);
+ }
+
+ @Override
+ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+ log.info("Member {} attribute {} changed to {}",
+ memberAttributeEvent.getMember(),
+ memberAttributeEvent.getKey(),
+ memberAttributeEvent.getValue());
+ }
}
}
diff --git a/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
new file mode 100644
index 0000000..92d6880
--- /dev/null
+++ b/core/store/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -0,0 +1,105 @@
+package org.onlab.onos.store.cluster.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.IMap;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.MastershipEvent;
+import org.onlab.onos.cluster.MastershipStore;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
+import org.onlab.onos.store.impl.AbstractDistributedStore;
+import org.onlab.onos.store.impl.OptionalCacheLoader;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static com.google.common.cache.CacheBuilder.newBuilder;
+
+/**
+ * Distributed implementation of the cluster nodes store.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMastershipStore extends AbstractDistributedStore
+ implements MastershipStore {
+
+ private IMap<byte[], byte[]> rawMasters;
+ private LoadingCache<DeviceId, Optional<NodeId>> masters;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Activate
+ public void activate() {
+ super.activate();
+
+ rawMasters = theInstance.getMap("masters");
+ OptionalCacheLoader<DeviceId, NodeId> nodeLoader
+ = new OptionalCacheLoader<>(storeService, rawMasters);
+ masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+ rawMasters.addEntryListener(new RemoteEventHandler<>(masters), true);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public MastershipEvent setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
+ synchronized (this) {
+ NodeId currentMaster = getMaster(deviceId);
+ if (role == MastershipRole.MASTER && Objects.equals(currentMaster, nodeId)) {
+ return null;
+ }
+
+ // FIXME: for now implementing semantics of setMaster
+ rawMasters.put(serialize(deviceId), serialize(nodeId));
+ masters.put(deviceId, Optional.of(nodeId));
+ return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
+ }
+ }
+
+ @Override
+ public NodeId getMaster(DeviceId deviceId) {
+ return masters.getUnchecked(deviceId).orNull();
+ }
+
+ @Override
+ public Set<DeviceId> getDevices(NodeId nodeId) {
+ ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
+ for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
+ if (nodeId.equals(entry.getValue().get())) {
+ builder.add(entry.getKey());
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public MastershipRole requestRole(DeviceId deviceId) {
+ // FIXME: for now we are 'selecting' as master whoever asks
+ setRole(clusterService.getLocalNode().id(), deviceId, MastershipRole.MASTER);
+ return MastershipRole.MASTER;
+ }
+
+ @Override
+ public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+ NodeId master = masters.getUnchecked(deviceId).orNull();
+ return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
+ }
+
+}
diff --git a/core/store/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java b/core/store/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
index 207036c..7dd827e 100644
--- a/core/store/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
+++ b/core/store/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
@@ -1,23 +1,15 @@
package org.onlab.onos.store.device.impl;
import com.google.common.base.Optional;
-import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
-import com.hazelcast.core.EntryAdapter;
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ISet;
-import com.hazelcast.core.MapEvent;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
@@ -31,8 +23,8 @@
import org.onlab.onos.net.device.DeviceStore;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.StoreService;
import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
+import org.onlab.onos.store.impl.AbstractDistributedStore;
import org.onlab.onos.store.impl.OptionalCacheLoader;
import org.slf4j.Logger;
@@ -47,17 +39,17 @@
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
-
/**
* Manages inventory of infrastructure devices using Hazelcast-backed map.
*/
@Component(immediate = true)
@Service
-public class DistributedDeviceStore implements DeviceStore {
+public class DistributedDeviceStore extends AbstractDistributedStore
+ implements DeviceStore {
private final Logger log = getLogger(getClass());
@@ -79,16 +71,9 @@
private IMap<byte[], byte[]> rawDevicePorts;
private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StoreService storeService;
-
- protected HazelcastInstance theInstance;
-
-
@Activate
public void activate() {
- log.info("Started");
- theInstance = storeService.getHazelcastInstance();
+ super.activate();
// IMap event handler needs value
final boolean includeValue = true;
@@ -96,40 +81,29 @@
// TODO decide on Map name scheme to avoid collision
rawDevices = theInstance.getMap("devices");
final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
- = new OptionalCacheLoader<>(storeService, rawDevices);
- devices = new AbsentInvalidatingLoadingCache<>(
- CacheBuilder.newBuilder()
- .build(deviceLoader));
+ = new OptionalCacheLoader<>(storeService, rawDevices);
+ devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
- rawDevices.addEntryListener(
- new RemoteEventHandler<>(devices),
- includeValue);
+ rawDevices.addEntryListener(new RemoteEventHandler<>(devices), includeValue);
rawRoles = theInstance.getMap("roles");
final OptionalCacheLoader<DeviceId, MastershipRole> rolesLoader
- = new OptionalCacheLoader<>(storeService, rawRoles);
- roles = new AbsentInvalidatingLoadingCache<>(
- CacheBuilder.newBuilder()
- .build(rolesLoader));
+ = new OptionalCacheLoader<>(storeService, rawRoles);
+ roles = new AbsentInvalidatingLoadingCache<>(newBuilder().build(rolesLoader));
// refresh/populate cache based on notification from other instance
- rawRoles.addEntryListener(
- new RemoteEventHandler<>(roles),
- includeValue);
+ rawRoles.addEntryListener(new RemoteEventHandler<>(roles), includeValue);
// TODO cache availableDevices
availableDevices = theInstance.getSet("availableDevices");
rawDevicePorts = theInstance.getMap("devicePorts");
final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
- = new OptionalCacheLoader<>(storeService, rawDevicePorts);
- devicePorts = new AbsentInvalidatingLoadingCache<>(
- CacheBuilder.newBuilder()
- .build(devicePortLoader));
+ = new OptionalCacheLoader<>(storeService, rawDevicePorts);
+ devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
- rawDevicePorts.addEntryListener(
- new RemoteEventHandler<>(devicePorts),
- includeValue);
+ rawDevicePorts.addEntryListener(new RemoteEventHandler<>(devicePorts), includeValue);
+ log.info("Started");
}
@Deactivate
@@ -369,25 +343,6 @@
}
@Override
- public MastershipRole getRole(DeviceId deviceId) {
- MastershipRole role = roles.getUnchecked(deviceId).orNull();
- return role != null ? role : MastershipRole.NONE;
- }
-
- @Override
- public DeviceEvent setRole(DeviceId deviceId, MastershipRole role) {
- synchronized (this) {
- Device device = getDevice(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- MastershipRole oldRole = deserialize(
- rawRoles.put(serialize(deviceId), serialize(role)));
- roles.put(deviceId, Optional.of(role));
- return oldRole == role ? null :
- new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device, null);
- }
- }
-
- @Override
public DeviceEvent removeDevice(DeviceId deviceId) {
synchronized (this) {
byte[] deviceIdBytes = serialize(deviceId);
@@ -403,54 +358,5 @@
}
// TODO cache serialized DeviceID if we suffer from serialization cost
- private byte[] serialize(final Object obj) {
- return storeService.serialize(obj);
- }
- private <T> T deserialize(final byte[] bytes) {
- return storeService.deserialize(bytes);
- }
-
- /**
- * An IMap EntryListener, which reflects each remote event to cache.
- *
- * @param <K> IMap key type after deserialization
- * @param <V> IMap value type after deserialization
- */
- public final class RemoteEventHandler<K, V> extends
- EntryAdapter<byte[], byte[]> {
-
- private LoadingCache<K, Optional<V>> cache;
-
- /**
- * Constructor.
- *
- * @param cache cache to update
- */
- public RemoteEventHandler(
- LoadingCache<K, Optional<V>> cache) {
- this.cache = checkNotNull(cache);
- }
-
- @Override
- public void mapCleared(MapEvent event) {
- cache.invalidateAll();
- }
-
- @Override
- public void entryUpdated(EntryEvent<byte[], byte[]> event) {
- cache.put(storeService.<K>deserialize(event.getKey()),
- Optional.of(storeService.<V>deserialize(event.getValue())));
- }
-
- @Override
- public void entryRemoved(EntryEvent<byte[], byte[]> event) {
- cache.invalidate(storeService.<K>deserialize(event.getKey()));
- }
-
- @Override
- public void entryAdded(EntryEvent<byte[], byte[]> event) {
- entryUpdated(event);
- }
- }
}
diff --git a/core/store/src/main/java/org/onlab/onos/store/impl/AbstractDistributedStore.java b/core/store/src/main/java/org/onlab/onos/store/impl/AbstractDistributedStore.java
new file mode 100644
index 0000000..e11dda3
--- /dev/null
+++ b/core/store/src/main/java/org/onlab/onos/store/impl/AbstractDistributedStore.java
@@ -0,0 +1,100 @@
+package org.onlab.onos.store.impl;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.LoadingCache;
+import com.hazelcast.core.EntryAdapter;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.MapEvent;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.store.StoreService;
+import org.slf4j.Logger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Abstraction of a distributed store based on Hazelcast.
+ */
+@Component(componentAbstract = true)
+public abstract class AbstractDistributedStore {
+
+ protected final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StoreService storeService;
+
+ protected HazelcastInstance theInstance;
+
+ @Activate
+ public void activate() {
+ theInstance = storeService.getHazelcastInstance();
+ }
+
+ /**
+ * Serializes the specified object using the backing store service.
+ *
+ * @param obj object to be serialized
+ * @return serialized object
+ */
+ protected byte[] serialize(Object obj) {
+ return storeService.serialize(obj);
+ }
+
+ /**
+ * Deserializes the specified object using the backing store service.
+ *
+ * @param bytes bytes to be deserialized
+ * @param <T> type of object
+ * @return deserialized object
+ */
+ protected <T> T deserialize(byte[] bytes) {
+ return storeService.deserialize(bytes);
+ }
+
+
+ /**
+ * An IMap entry listener, which reflects each remote event to the cache.
+ *
+ * @param <K> IMap key type after deserialization
+ * @param <V> IMap value type after deserialization
+ */
+ public final class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
+
+ private LoadingCache<K, Optional<V>> cache;
+
+ /**
+ * Constructor.
+ *
+ * @param cache cache to update
+ */
+ public RemoteEventHandler(LoadingCache<K, Optional<V>> cache) {
+ this.cache = checkNotNull(cache);
+ }
+
+ @Override
+ public void mapCleared(MapEvent event) {
+ cache.invalidateAll();
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+ cache.put(storeService.<K>deserialize(event.getKey()),
+ Optional.of(storeService.<V>deserialize(event.getValue())));
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+ cache.invalidate(storeService.<K>deserialize(event.getKey()));
+ }
+
+ @Override
+ public void entryAdded(EntryEvent<byte[], byte[]> event) {
+ entryUpdated(event);
+ }
+ }
+
+}
diff --git a/core/store/src/main/java/org/onlab/onos/store/impl/StoreManager.java b/core/store/src/main/java/org/onlab/onos/store/impl/StoreManager.java
index cfb80a0..82472b7 100644
--- a/core/store/src/main/java/org/onlab/onos/store/impl/StoreManager.java
+++ b/core/store/src/main/java/org/onlab/onos/store/impl/StoreManager.java
@@ -9,6 +9,9 @@
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
@@ -21,8 +24,11 @@
import org.onlab.onos.store.StoreService;
import org.onlab.onos.store.serializers.DefaultPortSerializer;
import org.onlab.onos.store.serializers.DeviceIdSerializer;
+import org.onlab.onos.store.serializers.IpPrefixSerializer;
+import org.onlab.onos.store.serializers.NodeIdSerializer;
import org.onlab.onos.store.serializers.PortNumberSerializer;
import org.onlab.onos.store.serializers.ProviderIdSerializer;
+import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,18 +71,21 @@
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
- .register(
- ArrayList.class,
- HashMap.class,
+ .register(ArrayList.class,
+ HashMap.class,
- Device.Type.class,
+ ControllerNode.State.class,
+ Device.Type.class,
- DefaultDevice.class,
- MastershipRole.class,
- Port.class,
- Element.class
+ DefaultControllerNode.class,
+ DefaultDevice.class,
+ MastershipRole.class,
+ Port.class,
+ Element.class
)
+ .register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
+ .register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
diff --git a/core/store/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java b/core/store/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java
new file mode 100644
index 0000000..ef9d3f1
--- /dev/null
+++ b/core/store/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.serializers;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.onlab.onos.cluster.NodeId;
+
+/**
+ * Kryo Serializer for {@link org.onlab.onos.cluster.NodeId}.
+ */
+public final class NodeIdSerializer extends Serializer<NodeId> {
+
+ @Override
+ public void write(Kryo kryo, Output output, NodeId object) {
+ kryo.writeObject(output, object.toString());
+ }
+
+ @Override
+ public NodeId read(Kryo kryo, Input input, Class<NodeId> type) {
+ final String id = kryo.readObject(input, String.class);
+ return new NodeId(id);
+ }
+}