Support for optimistic replication on GossipHostStore
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java
deleted file mode 100644
index 9362156..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java
+++ /dev/null
@@ -1,302 +0,0 @@
-package org.onlab.onos.store.host.impl;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.net.Annotations;
-import org.onlab.onos.net.ConnectPoint;
-import org.onlab.onos.net.DefaultHost;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Host;
-import org.onlab.onos.net.HostId;
-import org.onlab.onos.net.HostLocation;
-import org.onlab.onos.net.host.HostDescription;
-import org.onlab.onos.net.host.HostEvent;
-import org.onlab.onos.net.host.HostStore;
-import org.onlab.onos.net.host.HostStoreDelegate;
-import org.onlab.onos.net.host.PortAddresses;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.packet.IpPrefix;
-import org.onlab.packet.MacAddress;
-import org.onlab.packet.VlanId;
-import org.slf4j.Logger;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.onlab.onos.net.host.HostEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of end-station hosts using trivial in-memory
- * implementation.
- */
-//FIXME: I LIE I AM NOT DISTRIBUTED
-@Component(immediate = true)
-@Service
-public class DistributedHostStore
- extends AbstractStore<HostEvent, HostStoreDelegate>
- implements HostStore {
-
- private final Logger log = getLogger(getClass());
-
- // Host inventory
- private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
-
- // Hosts tracked by their location
- private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
-
- private final Map<ConnectPoint, PortAddresses> portAddresses =
- new ConcurrentHashMap<>();
-
- @Activate
- public void activate() {
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
- HostDescription hostDescription) {
- StoredHost host = hosts.get(hostId);
- if (host == null) {
- return createHost(providerId, hostId, hostDescription);
- }
- return updateHost(providerId, host, hostDescription);
- }
-
- // creates a new host and sends HOST_ADDED
- private HostEvent createHost(ProviderId providerId, HostId hostId,
- HostDescription descr) {
- StoredHost newhost = new StoredHost(providerId, hostId,
- descr.hwAddress(),
- descr.vlan(),
- descr.location(),
- ImmutableSet.of(descr.ipAddress()));
- synchronized (this) {
- hosts.put(hostId, newhost);
- locations.put(descr.location(), newhost);
- }
- return new HostEvent(HOST_ADDED, newhost);
- }
-
- // checks for type of update to host, sends appropriate event
- private HostEvent updateHost(ProviderId providerId, StoredHost host,
- HostDescription descr) {
- HostEvent event;
- if (!host.location().equals(descr.location())) {
- host.setLocation(descr.location());
- return new HostEvent(HOST_MOVED, host);
- }
-
- if (host.ipAddresses().contains(descr.ipAddress())) {
- return null;
- }
-
- Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
- addresses.add(descr.ipAddress());
- StoredHost updated = new StoredHost(providerId, host.id(),
- host.mac(), host.vlan(),
- descr.location(), addresses);
- event = new HostEvent(HOST_UPDATED, updated);
- synchronized (this) {
- hosts.put(host.id(), updated);
- locations.remove(host.location(), host);
- locations.put(updated.location(), updated);
- }
- return event;
- }
-
- @Override
- public HostEvent removeHost(HostId hostId) {
- synchronized (this) {
- Host host = hosts.remove(hostId);
- if (host != null) {
- locations.remove((host.location()), host);
- return new HostEvent(HOST_REMOVED, host);
- }
- return null;
- }
- }
-
- @Override
- public int getHostCount() {
- return hosts.size();
- }
-
- @Override
- public Iterable<Host> getHosts() {
- return ImmutableSet.<Host>copyOf(hosts.values());
- }
-
- @Override
- public Host getHost(HostId hostId) {
- return hosts.get(hostId);
- }
-
- @Override
- public Set<Host> getHosts(VlanId vlanId) {
- Set<Host> vlanset = new HashSet<>();
- for (Host h : hosts.values()) {
- if (h.vlan().equals(vlanId)) {
- vlanset.add(h);
- }
- }
- return vlanset;
- }
-
- @Override
- public Set<Host> getHosts(MacAddress mac) {
- Set<Host> macset = new HashSet<>();
- for (Host h : hosts.values()) {
- if (h.mac().equals(mac)) {
- macset.add(h);
- }
- }
- return macset;
- }
-
- @Override
- public Set<Host> getHosts(IpPrefix ip) {
- Set<Host> ipset = new HashSet<>();
- for (Host h : hosts.values()) {
- if (h.ipAddresses().contains(ip)) {
- ipset.add(h);
- }
- }
- return ipset;
- }
-
- @Override
- public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
- return ImmutableSet.copyOf(locations.get(connectPoint));
- }
-
- @Override
- public Set<Host> getConnectedHosts(DeviceId deviceId) {
- Set<Host> hostset = new HashSet<>();
- for (ConnectPoint p : locations.keySet()) {
- if (p.deviceId().equals(deviceId)) {
- hostset.addAll(locations.get(p));
- }
- }
- return hostset;
- }
-
- @Override
- public void updateAddressBindings(PortAddresses addresses) {
- synchronized (portAddresses) {
- PortAddresses existing = portAddresses.get(addresses.connectPoint());
- if (existing == null) {
- portAddresses.put(addresses.connectPoint(), addresses);
- } else {
- Set<IpPrefix> union = Sets.union(existing.ips(), addresses.ips())
- .immutableCopy();
-
- MacAddress newMac = (addresses.mac() == null) ? existing.mac()
- : addresses.mac();
-
- PortAddresses newAddresses =
- new PortAddresses(addresses.connectPoint(), union, newMac);
-
- portAddresses.put(newAddresses.connectPoint(), newAddresses);
- }
- }
- }
-
- @Override
- public void removeAddressBindings(PortAddresses addresses) {
- synchronized (portAddresses) {
- PortAddresses existing = portAddresses.get(addresses.connectPoint());
- if (existing != null) {
- Set<IpPrefix> difference =
- Sets.difference(existing.ips(), addresses.ips()).immutableCopy();
-
- // If they removed the existing mac, set the new mac to null.
- // Otherwise, keep the existing mac.
- MacAddress newMac = existing.mac();
- if (addresses.mac() != null && addresses.mac().equals(existing.mac())) {
- newMac = null;
- }
-
- PortAddresses newAddresses =
- new PortAddresses(addresses.connectPoint(), difference, newMac);
-
- portAddresses.put(newAddresses.connectPoint(), newAddresses);
- }
- }
- }
-
- @Override
- public void clearAddressBindings(ConnectPoint connectPoint) {
- synchronized (portAddresses) {
- portAddresses.remove(connectPoint);
- }
- }
-
- @Override
- public Set<PortAddresses> getAddressBindings() {
- synchronized (portAddresses) {
- return new HashSet<>(portAddresses.values());
- }
- }
-
- @Override
- public PortAddresses getAddressBindingsForPort(ConnectPoint connectPoint) {
- PortAddresses addresses;
-
- synchronized (portAddresses) {
- addresses = portAddresses.get(connectPoint);
- }
-
- if (addresses == null) {
- addresses = new PortAddresses(connectPoint, null, null);
- }
-
- return addresses;
- }
-
- // Auxiliary extension to allow location to mutate.
- private class StoredHost extends DefaultHost {
- private HostLocation location;
-
- /**
- * Creates an end-station host using the supplied information.
- *
- * @param providerId provider identity
- * @param id host identifier
- * @param mac host MAC address
- * @param vlan host VLAN identifier
- * @param location host location
- * @param ips host IP addresses
- * @param annotations optional key/value annotations
- */
- public StoredHost(ProviderId providerId, HostId id,
- MacAddress mac, VlanId vlan, HostLocation location,
- Set<IpPrefix> ips, Annotations... annotations) {
- super(providerId, id, mac, vlan, location, ips, annotations);
- this.location = location;
- }
-
- void setLocation(HostLocation location) {
- this.location = location;
- }
-
- @Override
- public HostLocation location() {
- return location;
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index e20af88..a1d6c72 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -29,12 +29,18 @@
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.DistributedStoreSerializers;
+import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.KryoPool;
import org.slf4j.Logger;
+import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -76,6 +82,17 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(DistributedStoreSerializers.COMMON)
+ .register(InternalHostRemovedEvent.class)
+ .build()
+ .populate(1);
+ }
+ };
+
@Activate
public void activate() {
log.info("Started");
@@ -90,8 +107,18 @@
public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
HostDescription hostDescription) {
Timestamp timestamp = hostClockService.getTimestamp(hostId);
- return createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
- // TODO: tell peers.
+ HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
+ if (event != null) {
+ log.info("Notifying peers of a host topology event for providerId: "
+ + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
+ try {
+ notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a host topology event for providerId: "
+ + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
+ }
+ }
+ return event;
}
private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
@@ -157,8 +184,16 @@
@Override
public HostEvent removeHost(HostId hostId) {
Timestamp timestamp = hostClockService.getTimestamp(hostId);
- return removeHostInternal(hostId, timestamp);
- // TODO: tell peers
+ HostEvent event = removeHostInternal(hostId, timestamp);
+ if (event != null) {
+ log.info("Notifying peers of a host removed topology event for hostId: {}", hostId);
+ try {
+ notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
+ } catch (IOException e) {
+ log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
+ }
+ }
+ return event;
}
private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
@@ -341,4 +376,20 @@
return location.value();
}
}
+
+ private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
+ broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
+ }
+
+ private void notifyPeers(InternalHostEvent event) throws IOException {
+ broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
+ }
+
+ private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ subject,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
new file mode 100644
index 0000000..27cf4ce
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
@@ -0,0 +1,9 @@
+package org.onlab.onos.store.host.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+public final class GossipHostStoreMessageSubjects {
+ private GossipHostStoreMessageSubjects() {}
+ public static final MessageSubject HOST_UPDATED = new MessageSubject("peer-host-updated");
+ public static final MessageSubject HOST_REMOVED = new MessageSubject("peer-host-removed");
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostEvent.java
new file mode 100644
index 0000000..f5ca63e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostEvent.java
@@ -0,0 +1,51 @@
+package org.onlab.onos.store.host.impl;
+
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.host.HostDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipHostStore to notify peers of a host
+ * change (create/update) event.
+ */
+public class InternalHostEvent {
+
+ private final ProviderId providerId;
+ private final HostId hostId;
+ private final HostDescription hostDescription;
+ private final Timestamp timestamp;
+
+ public InternalHostEvent(ProviderId providerId, HostId hostId,
+ HostDescription hostDescription, Timestamp timestamp) {
+ this.providerId = providerId;
+ this.hostId = hostId;
+ this.hostDescription = hostDescription;
+ this.timestamp = timestamp;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public HostId hostId() {
+ return hostId;
+ }
+
+ public HostDescription hostDescription() {
+ return hostDescription;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ // Needed for serialization.
+ @SuppressWarnings("unused")
+ private InternalHostEvent() {
+ providerId = null;
+ hostId = null;
+ hostDescription = null;
+ timestamp = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostRemovedEvent.java
new file mode 100644
index 0000000..8dd3b44
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostRemovedEvent.java
@@ -0,0 +1,34 @@
+package org.onlab.onos.store.host.impl;
+
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipHostStore to notify peers of a host
+ * removed event.
+ */
+public class InternalHostRemovedEvent {
+
+ private final HostId hostId;
+ private final Timestamp timestamp;
+
+ public InternalHostRemovedEvent(HostId hostId, Timestamp timestamp) {
+ this.hostId = hostId;
+ this.timestamp = timestamp;
+ }
+
+ public HostId hostId() {
+ return hostId;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ // for serialization.
+ @SuppressWarnings("unused")
+ private InternalHostRemovedEvent() {
+ hostId = null;
+ timestamp = null;
+ }
+}