GossipHostStore: add AE support
- modified HostDescription family to hold Set of IpAddresses
Change-Id: Id920fdc83817802885e8528af185a5ad590bf999
diff --git a/core/api/src/main/java/org/onlab/onos/net/host/DefaultHostDescription.java b/core/api/src/main/java/org/onlab/onos/net/host/DefaultHostDescription.java
index 2e92dad..71a952e 100644
--- a/core/api/src/main/java/org/onlab/onos/net/host/DefaultHostDescription.java
+++ b/core/api/src/main/java/org/onlab/onos/net/host/DefaultHostDescription.java
@@ -1,5 +1,8 @@
package org.onlab.onos.net.host;
+import java.util.Collections;
+import java.util.Set;
+
import org.onlab.onos.net.AbstractDescription;
import org.onlab.onos.net.HostLocation;
import org.onlab.onos.net.SparseAnnotations;
@@ -7,6 +10,8 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import com.google.common.collect.ImmutableSet;
+
import static com.google.common.base.MoreObjects.toStringHelper;
/**
@@ -18,7 +23,7 @@
private final MacAddress mac;
private final VlanId vlan;
private final HostLocation location;
- private final IpPrefix ip;
+ private final Set<IpPrefix> ip;
/**
* Creates a host description using the supplied information.
@@ -31,7 +36,7 @@
public DefaultHostDescription(MacAddress mac, VlanId vlan,
HostLocation location,
SparseAnnotations... annotations) {
- this(mac, vlan, location, null, annotations);
+ this(mac, vlan, location, Collections.<IpPrefix>emptySet(), annotations);
}
/**
@@ -46,11 +51,26 @@
public DefaultHostDescription(MacAddress mac, VlanId vlan,
HostLocation location, IpPrefix ip,
SparseAnnotations... annotations) {
+ this(mac, vlan, location, ImmutableSet.of(ip), annotations);
+ }
+
+ /**
+ * Creates a host description using the supplied information.
+ *
+ * @param mac host MAC address
+ * @param vlan host VLAN identifier
+ * @param location host location
+ * @param ip host IP addresses
+ * @param annotations optional key/value annotations map
+ */
+ public DefaultHostDescription(MacAddress mac, VlanId vlan,
+ HostLocation location, Set<IpPrefix> ip,
+ SparseAnnotations... annotations) {
super(annotations);
this.mac = mac;
this.vlan = vlan;
this.location = location;
- this.ip = ip;
+ this.ip = ImmutableSet.copyOf(ip);
}
@Override
@@ -69,7 +89,7 @@
}
@Override
- public IpPrefix ipAddress() {
+ public Set<IpPrefix> ipAddress() {
return ip;
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/host/HostDescription.java b/core/api/src/main/java/org/onlab/onos/net/host/HostDescription.java
index fc16854..258ce3d 100644
--- a/core/api/src/main/java/org/onlab/onos/net/host/HostDescription.java
+++ b/core/api/src/main/java/org/onlab/onos/net/host/HostDescription.java
@@ -1,5 +1,7 @@
package org.onlab.onos.net.host;
+import java.util.Set;
+
import org.onlab.onos.net.Description;
import org.onlab.onos.net.HostLocation;
import org.onlab.packet.IpPrefix;
@@ -38,6 +40,6 @@
* @return host IP address
*/
// FIXME: Switch to IpAddress
- IpPrefix ipAddress();
+ Set<IpPrefix> ipAddress();
}
diff --git a/core/api/src/test/java/org/onlab/onos/net/host/DefualtHostDecriptionTest.java b/core/api/src/test/java/org/onlab/onos/net/host/DefualtHostDecriptionTest.java
index 5ae7c27..f2b9475 100644
--- a/core/api/src/test/java/org/onlab/onos/net/host/DefualtHostDecriptionTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/host/DefualtHostDecriptionTest.java
@@ -8,6 +8,8 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import com.google.common.collect.ImmutableSet;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -33,7 +35,7 @@
assertEquals("incorrect mac", MAC, host.hwAddress());
assertEquals("incorrect vlan", VLAN, host.vlan());
assertEquals("incorrect location", LOC, host.location());
- assertEquals("incorrect ip's", IP, host.ipAddress());
+ assertEquals("incorrect ip's", ImmutableSet.of(IP), host.ipAddress());
assertTrue("incorrect toString", host.toString().contains("vlan=10"));
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index 39bc770..4025d0c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -1,10 +1,13 @@
package org.onlab.onos.store.host.impl;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -12,6 +15,8 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultHost;
@@ -19,6 +24,7 @@
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.HostLocation;
+import org.onlab.onos.net.host.DefaultHostDescription;
import org.onlab.onos.net.host.HostClockService;
import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.host.HostEvent;
@@ -42,12 +48,19 @@
import org.slf4j.Logger;
import java.io.IOException;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onlab.onos.net.host.HostEvent.Type.*;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
//TODO: multi-provider, annotation not supported.
@@ -88,24 +101,58 @@
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(DistributedStoreSerializers.COMMON)
+ .register(InternalHostEvent.class)
.register(InternalHostRemovedEvent.class)
+ .register(HostFragmentId.class)
+ .register(HostAntiEntropyAdvertisement.class)
.build()
.populate(1);
}
};
+ private ScheduledExecutorService executor;
+
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
- GossipHostStoreMessageSubjects.HOST_UPDATED, new InternalHostEventListener());
+ GossipHostStoreMessageSubjects.HOST_UPDATED,
+ new InternalHostEventListener());
clusterCommunicator.addSubscriber(
- GossipHostStoreMessageSubjects.HOST_REMOVED, new InternalHostRemovedEventListener());
+ GossipHostStoreMessageSubjects.HOST_REMOVED,
+ new InternalHostRemovedEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
+ new InternalHostAntiEntropyAdvertisementListener());
+
+ executor =
+ newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
+
+ // TODO: Make these configurable
+ long initialDelaySec = 5;
+ long periodSec = 5;
+ // start anti-entropy thread
+ executor.scheduleAtFixedRate(new SendAdvertisementTask(),
+ initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ executor.shutdownNow();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ log.error("Timeout during executor shutdown");
+ }
+ } catch (InterruptedException e) {
+ log.error("Error during executor shutdown", e);
+ }
+
+ hosts.clear();
+ removedHosts.clear();
+ locations.clear();
+ portAddresses.clear();
+
log.info("Stopped");
}
@@ -153,7 +200,7 @@
descr.hwAddress(),
descr.vlan(),
new Timestamped<>(descr.location(), timestamp),
- ImmutableSet.of(descr.ipAddress()));
+ ImmutableSet.copyOf(descr.ipAddress()));
hosts.put(hostId, newhost);
locations.put(descr.location(), newhost);
return new HostEvent(HOST_ADDED, newhost);
@@ -169,12 +216,12 @@
return new HostEvent(HOST_MOVED, host);
}
- if (host.ipAddresses().contains(descr.ipAddress())) {
+ if (host.ipAddresses().containsAll(descr.ipAddress())) {
return null;
}
Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
- addresses.add(descr.ipAddress());
+ addresses.addAll(descr.ipAddress());
StoredHost updated = new StoredHost(providerId, host.id(),
host.mac(), host.vlan(),
host.location, addresses);
@@ -381,6 +428,10 @@
public HostLocation location() {
return location.value();
}
+
+ public Timestamp timestamp() {
+ return location.timestamp();
+ }
}
private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
@@ -399,6 +450,16 @@
clusterCommunicator.broadcast(message);
}
+ private void unicastMessage(NodeId peer,
+ MessageSubject subject,
+ Object event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ SERIALIZER.encode(event));
+ clusterCommunicator.unicast(message, peer);
+ }
+
private void notifyDelegateIfNotNull(HostEvent event) {
if (event != null) {
notifyDelegate(event);
@@ -434,4 +495,165 @@
notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
}
}
+
+ private final class SendAdvertisementTask implements Runnable {
+
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ final NodeId self = clusterService.getLocalNode().id();
+ Set<ControllerNode> nodes = clusterService.getNodes();
+
+ ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
+ .transform(toNodeId())
+ .toList();
+
+ if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
+ log.debug("No other peers in the cluster.");
+ return;
+ }
+
+ NodeId peer;
+ do {
+ int idx = RandomUtils.nextInt(0, nodeIds.size());
+ peer = nodeIds.get(idx);
+ } while (peer.equals(self));
+
+ HostAntiEntropyAdvertisement ad = createAdvertisement();
+
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ unicastMessage(peer, GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
+ } catch (IOException e) {
+ log.debug("Failed to send anti-entropy advertisement", e);
+ return;
+ }
+ } catch (Exception e) {
+ // catch all Exception to avoid Scheduled task being suppressed.
+ log.error("Exception thrown while sending advertisement", e);
+ }
+ }
+ }
+
+ private HostAntiEntropyAdvertisement createAdvertisement() {
+ final NodeId self = clusterService.getLocalNode().id();
+
+ Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
+ Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
+
+ for (Entry<HostId, StoredHost> e : hosts.entrySet()) {
+
+ final HostId hostId = e.getKey();
+ final StoredHost hostInfo = e.getValue();
+ final ProviderId providerId = hostInfo.providerId();
+ timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
+ }
+
+ for (Entry<HostId, Timestamped<Host>> e : removedHosts.entrySet()) {
+ tombstones.put(e.getKey(), e.getValue().timestamp());
+ }
+
+ return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
+ }
+
+ private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
+
+ final NodeId sender = ad.sender();
+
+ for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
+ // for each locally live Hosts...
+ final HostId hostId = host.getKey();
+ final StoredHost localHost = host.getValue();
+ final ProviderId providerId = localHost.providerId();
+ final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
+ final Timestamp localLiveTimestamp = localHost.timestamp();
+
+ Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
+ if (remoteTimestamp == null) {
+ remoteTimestamp = ad.tombstones().get(hostId);
+ }
+ if (remoteTimestamp == null ||
+ localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
+
+ // local is more recent, push
+ // TODO: annotation is lost
+ final HostDescription desc = new DefaultHostDescription(
+ localHost.mac(),
+ localHost.vlan(),
+ localHost.location(),
+ localHost.ipAddresses());
+ try {
+ unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_UPDATED,
+ new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
+ } catch (IOException e1) {
+ log.debug("Failed to send advertisement response", e1);
+ }
+ }
+
+ final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
+ if (remoteDeadTimestamp != null &&
+ remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
+ // sender has recent remove
+ notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
+ }
+ }
+
+ for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
+ // for each locally dead Hosts
+ final HostId hostId = dead.getKey();
+ final Timestamp localDeadTimestamp = dead.getValue().timestamp();
+
+ // TODO: pick proper ProviderId, when supporting multi-provider
+ final ProviderId providerId = dead.getValue().value().providerId();
+ final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
+
+ final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
+ if (remoteLiveTimestamp != null &&
+ localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
+ // sender has zombie, push
+ try {
+ unicastMessage(sender, GossipHostStoreMessageSubjects.HOST_REMOVED,
+ new InternalHostRemovedEvent(hostId, localDeadTimestamp));
+ } catch (IOException e1) {
+ log.debug("Failed to send advertisement response", e1);
+ }
+ }
+ }
+
+
+ for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
+ // for each remote tombstone advertisement...
+ final HostId hostId = e.getKey();
+ final Timestamp adRemoveTimestamp = e.getValue();
+
+ final StoredHost storedHost = hosts.get(hostId);
+ if (storedHost == null) {
+ continue;
+ }
+ if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
+ // sender has recent remove info, locally remove
+ notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
+ }
+ }
+ }
+
+ private final class InternalHostAntiEntropyAdvertisementListener implements
+ ClusterMessageHandler {
+
+ @Override
+ public void handle(ClusterMessage message) {
+ log.debug("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
+ HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
+ handleAntiEntropyAdvertisement(advertisement);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
index 27cf4ce..0a9f0e0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
@@ -4,6 +4,11 @@
public final class GossipHostStoreMessageSubjects {
private GossipHostStoreMessageSubjects() {}
- public static final MessageSubject HOST_UPDATED = new MessageSubject("peer-host-updated");
- public static final MessageSubject HOST_REMOVED = new MessageSubject("peer-host-removed");
+
+ public static final MessageSubject HOST_UPDATED
+ = new MessageSubject("peer-host-updated");
+ public static final MessageSubject HOST_REMOVED
+ = new MessageSubject("peer-host-removed");
+ public static final MessageSubject HOST_ANTI_ENTROPY_ADVERTISEMENT
+ = new MessageSubject("host-enti-entropy-advertisement");;
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostAntiEntropyAdvertisement.java
new file mode 100644
index 0000000..6139005
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostAntiEntropyAdvertisement.java
@@ -0,0 +1,48 @@
+package org.onlab.onos.store.host.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Host AE Advertisement message.
+ */
+public final class HostAntiEntropyAdvertisement {
+
+ private final NodeId sender;
+ private final Map<HostFragmentId, Timestamp> timestamps;
+ private final Map<HostId, Timestamp> tombstones;
+
+
+ public HostAntiEntropyAdvertisement(NodeId sender,
+ Map<HostFragmentId, Timestamp> timestamps,
+ Map<HostId, Timestamp> tombstones) {
+ this.sender = checkNotNull(sender);
+ this.timestamps = checkNotNull(timestamps);
+ this.tombstones = checkNotNull(tombstones);
+ }
+
+ public NodeId sender() {
+ return sender;
+ }
+
+ public Map<HostFragmentId, Timestamp> timestamps() {
+ return timestamps;
+ }
+
+ public Map<HostId, Timestamp> tombstones() {
+ return tombstones;
+ }
+
+ // For serializer
+ @SuppressWarnings("unused")
+ private HostAntiEntropyAdvertisement() {
+ this.sender = null;
+ this.timestamps = null;
+ this.tombstones = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostFragmentId.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostFragmentId.java
new file mode 100644
index 0000000..34dbff6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/HostFragmentId.java
@@ -0,0 +1,62 @@
+package org.onlab.onos.store.host.impl;
+
+import java.util.Objects;
+
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.provider.ProviderId;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Identifier for HostDescription from a Provider.
+ */
+public final class HostFragmentId {
+ public final ProviderId providerId;
+ public final HostId hostId;
+
+ public HostFragmentId(HostId hostId, ProviderId providerId) {
+ this.providerId = providerId;
+ this.hostId = hostId;
+ }
+
+ public HostId hostId() {
+ return hostId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(providerId, hostId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof HostFragmentId)) {
+ return false;
+ }
+ HostFragmentId that = (HostFragmentId) obj;
+ return Objects.equals(this.hostId, that.hostId) &&
+ Objects.equals(this.providerId, that.providerId);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("providerId", providerId)
+ .add("hostId", hostId)
+ .toString();
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private HostFragmentId() {
+ this.providerId = null;
+ this.hostId = null;
+ }
+}
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java
index 0ca4ae2..bd4b3bf 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java
@@ -84,7 +84,7 @@
descr.hwAddress(),
descr.vlan(),
descr.location(),
- ImmutableSet.of(descr.ipAddress()));
+ ImmutableSet.copyOf(descr.ipAddress()));
synchronized (this) {
hosts.put(hostId, newhost);
locations.put(descr.location(), newhost);
@@ -101,12 +101,12 @@
return new HostEvent(HOST_MOVED, host);
}
- if (host.ipAddresses().contains(descr.ipAddress())) {
+ if (host.ipAddresses().containsAll(descr.ipAddress())) {
return null;
}
Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
- addresses.add(descr.ipAddress());
+ addresses.addAll(descr.ipAddress());
StoredHost updated = new StoredHost(providerId, host.id(),
host.mac(), host.vlan(),
descr.location(), addresses);
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/HostLocationSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/HostLocationSerializer.java
new file mode 100644
index 0000000..2ef09ac
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/HostLocationSerializer.java
@@ -0,0 +1,40 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.HostLocation;
+import org.onlab.onos.net.PortNumber;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+* Kryo Serializer for {@link HostLocation}.
+*/
+public class HostLocationSerializer extends Serializer<HostLocation> {
+
+ /**
+ * Creates {@link HostLocation} serializer instance.
+ */
+ public HostLocationSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, HostLocation object) {
+ kryo.writeClassAndObject(output, object.deviceId());
+ kryo.writeClassAndObject(output, object.port());
+ output.writeLong(object.time());
+ }
+
+ @Override
+ public HostLocation read(Kryo kryo, Input input, Class<HostLocation> type) {
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ PortNumber portNumber = (PortNumber) kryo.readClassAndObject(input);
+ long time = input.readLong();
+ return new HostLocation(deviceId, portNumber, time);
+ }
+
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index b44c102..b229063 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -17,6 +17,8 @@
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.HostLocation;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
@@ -24,15 +26,20 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.host.DefaultHostDescription;
+import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.Timestamp;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
import org.onlab.util.KryoPool;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
public final class KryoPoolUtil {
@@ -42,6 +49,8 @@
public static final KryoPool MISC = KryoPool.newBuilder()
.register(IpPrefix.class, new IpPrefixSerializer())
.register(IpAddress.class, new IpAddressSerializer())
+ .register(MacAddress.class, new MacAddressSerializer())
+ .register(VlanId.class)
.build();
// TODO: Populate other classes
@@ -52,6 +61,7 @@
.register(MISC)
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableList.class, new ImmutableListSerializer())
+ .register(ImmutableSet.class, new ImmutableSetSerializer())
.register(
//
ArrayList.class,
@@ -71,8 +81,10 @@
DefaultPortDescription.class,
Element.class,
Link.Type.class,
- Timestamp.class
-
+ Timestamp.class,
+ HostId.class,
+ HostDescription.class,
+ DefaultHostDescription.class
)
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
@@ -85,6 +97,7 @@
.register(DefaultLink.class, new DefaultLinkSerializer())
.register(MastershipTerm.class, new MastershipTermSerializer())
.register(MastershipRole.class, new MastershipRoleSerializer())
+ .register(HostLocation.class, new HostLocationSerializer())
.build();
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MacAddressSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MacAddressSerializer.java
new file mode 100644
index 0000000..954c071
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MacAddressSerializer.java
@@ -0,0 +1,32 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.packet.MacAddress;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link MacAddress}.
+ */
+public class MacAddressSerializer extends Serializer<MacAddress> {
+
+ /**
+ * Creates {@link MacAddress} serializer instance.
+ */
+ public MacAddressSerializer() {
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, MacAddress object) {
+ output.writeBytes(object.getAddress());
+ }
+
+ @Override
+ public MacAddress read(Kryo kryo, Input input, Class<MacAddress> type) {
+ return MacAddress.valueOf(input.readBytes(MacAddress.MAC_ADDRESS_LENGTH));
+ }
+
+}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java
index bf99227..ef80b72 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleHostStore.java
@@ -84,7 +84,7 @@
descr.hwAddress(),
descr.vlan(),
descr.location(),
- ImmutableSet.of(descr.ipAddress()));
+ ImmutableSet.copyOf(descr.ipAddress()));
synchronized (this) {
hosts.put(hostId, newhost);
locations.put(descr.location(), newhost);
@@ -101,12 +101,12 @@
return new HostEvent(HOST_MOVED, host);
}
- if (host.ipAddresses().contains(descr.ipAddress())) {
+ if (host.ipAddresses().containsAll(descr.ipAddress())) {
return null;
}
Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
- addresses.add(descr.ipAddress());
+ addresses.addAll(descr.ipAddress());
StoredHost updated = new StoredHost(providerId, host.id(),
host.mac(), host.vlan(),
descr.location(), addresses);