Support for optimistic replication on GossipHostStore
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java
deleted file mode 100644
index 9362156..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/DistributedHostStore.java
+++ /dev/null
@@ -1,302 +0,0 @@
-package org.onlab.onos.store.host.impl;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.net.Annotations;
-import org.onlab.onos.net.ConnectPoint;
-import org.onlab.onos.net.DefaultHost;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Host;
-import org.onlab.onos.net.HostId;
-import org.onlab.onos.net.HostLocation;
-import org.onlab.onos.net.host.HostDescription;
-import org.onlab.onos.net.host.HostEvent;
-import org.onlab.onos.net.host.HostStore;
-import org.onlab.onos.net.host.HostStoreDelegate;
-import org.onlab.onos.net.host.PortAddresses;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.packet.IpPrefix;
-import org.onlab.packet.MacAddress;
-import org.onlab.packet.VlanId;
-import org.slf4j.Logger;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.onlab.onos.net.host.HostEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of end-station hosts using trivial in-memory
- * implementation.
- */
-//FIXME: I LIE I AM NOT DISTRIBUTED
-@Component(immediate = true)
-@Service
-public class DistributedHostStore
-        extends AbstractStore<HostEvent, HostStoreDelegate>
-        implements HostStore {
-
-    private final Logger log = getLogger(getClass());
-
-    // Host inventory
-    private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(2000000, 0.75f, 16);
-
-    // Hosts tracked by their location
-    private final Multimap<ConnectPoint, Host> locations = HashMultimap.create();
-
-    private final Map<ConnectPoint, PortAddresses> portAddresses =
-            new ConcurrentHashMap<>();
-
-    @Activate
-    public void activate() {
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        log.info("Stopped");
-    }
-
-    @Override
-    public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
-                                        HostDescription hostDescription) {
-        StoredHost host = hosts.get(hostId);
-        if (host == null) {
-            return createHost(providerId, hostId, hostDescription);
-        }
-        return updateHost(providerId, host, hostDescription);
-    }
-
-    // creates a new host and sends HOST_ADDED
-    private HostEvent createHost(ProviderId providerId, HostId hostId,
-                                 HostDescription descr) {
-        StoredHost newhost = new StoredHost(providerId, hostId,
-                                            descr.hwAddress(),
-                                            descr.vlan(),
-                                            descr.location(),
-                                            ImmutableSet.of(descr.ipAddress()));
-        synchronized (this) {
-            hosts.put(hostId, newhost);
-            locations.put(descr.location(), newhost);
-        }
-        return new HostEvent(HOST_ADDED, newhost);
-    }
-
-    // checks for type of update to host, sends appropriate event
-    private HostEvent updateHost(ProviderId providerId, StoredHost host,
-                                 HostDescription descr) {
-        HostEvent event;
-        if (!host.location().equals(descr.location())) {
-            host.setLocation(descr.location());
-            return new HostEvent(HOST_MOVED, host);
-        }
-
-        if (host.ipAddresses().contains(descr.ipAddress())) {
-            return null;
-        }
-
-        Set<IpPrefix> addresses = new HashSet<>(host.ipAddresses());
-        addresses.add(descr.ipAddress());
-        StoredHost updated = new StoredHost(providerId, host.id(),
-                                            host.mac(), host.vlan(),
-                                            descr.location(), addresses);
-        event = new HostEvent(HOST_UPDATED, updated);
-        synchronized (this) {
-            hosts.put(host.id(), updated);
-            locations.remove(host.location(), host);
-            locations.put(updated.location(), updated);
-        }
-        return event;
-    }
-
-    @Override
-    public HostEvent removeHost(HostId hostId) {
-        synchronized (this) {
-            Host host = hosts.remove(hostId);
-            if (host != null) {
-                locations.remove((host.location()), host);
-                return new HostEvent(HOST_REMOVED, host);
-            }
-            return null;
-        }
-    }
-
-    @Override
-    public int getHostCount() {
-        return hosts.size();
-    }
-
-    @Override
-    public Iterable<Host> getHosts() {
-        return ImmutableSet.<Host>copyOf(hosts.values());
-    }
-
-    @Override
-    public Host getHost(HostId hostId) {
-        return hosts.get(hostId);
-    }
-
-    @Override
-    public Set<Host> getHosts(VlanId vlanId) {
-        Set<Host> vlanset = new HashSet<>();
-        for (Host h : hosts.values()) {
-            if (h.vlan().equals(vlanId)) {
-                vlanset.add(h);
-            }
-        }
-        return vlanset;
-    }
-
-    @Override
-    public Set<Host> getHosts(MacAddress mac) {
-        Set<Host> macset = new HashSet<>();
-        for (Host h : hosts.values()) {
-            if (h.mac().equals(mac)) {
-                macset.add(h);
-            }
-        }
-        return macset;
-    }
-
-    @Override
-    public Set<Host> getHosts(IpPrefix ip) {
-        Set<Host> ipset = new HashSet<>();
-        for (Host h : hosts.values()) {
-            if (h.ipAddresses().contains(ip)) {
-                ipset.add(h);
-            }
-        }
-        return ipset;
-    }
-
-    @Override
-    public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
-        return ImmutableSet.copyOf(locations.get(connectPoint));
-    }
-
-    @Override
-    public Set<Host> getConnectedHosts(DeviceId deviceId) {
-        Set<Host> hostset = new HashSet<>();
-        for (ConnectPoint p : locations.keySet()) {
-            if (p.deviceId().equals(deviceId)) {
-                hostset.addAll(locations.get(p));
-            }
-        }
-        return hostset;
-    }
-
-    @Override
-    public void updateAddressBindings(PortAddresses addresses) {
-        synchronized (portAddresses) {
-            PortAddresses existing = portAddresses.get(addresses.connectPoint());
-            if (existing == null) {
-                portAddresses.put(addresses.connectPoint(), addresses);
-            } else {
-                Set<IpPrefix> union = Sets.union(existing.ips(), addresses.ips())
-                        .immutableCopy();
-
-                MacAddress newMac = (addresses.mac() == null) ? existing.mac()
-                        : addresses.mac();
-
-                PortAddresses newAddresses =
-                        new PortAddresses(addresses.connectPoint(), union, newMac);
-
-                portAddresses.put(newAddresses.connectPoint(), newAddresses);
-            }
-        }
-    }
-
-    @Override
-    public void removeAddressBindings(PortAddresses addresses) {
-        synchronized (portAddresses) {
-            PortAddresses existing = portAddresses.get(addresses.connectPoint());
-            if (existing != null) {
-                Set<IpPrefix> difference =
-                        Sets.difference(existing.ips(), addresses.ips()).immutableCopy();
-
-                // If they removed the existing mac, set the new mac to null.
-                // Otherwise, keep the existing mac.
-                MacAddress newMac = existing.mac();
-                if (addresses.mac() != null && addresses.mac().equals(existing.mac())) {
-                    newMac = null;
-                }
-
-                PortAddresses newAddresses =
-                        new PortAddresses(addresses.connectPoint(), difference, newMac);
-
-                portAddresses.put(newAddresses.connectPoint(), newAddresses);
-            }
-        }
-    }
-
-    @Override
-    public void clearAddressBindings(ConnectPoint connectPoint) {
-        synchronized (portAddresses) {
-            portAddresses.remove(connectPoint);
-        }
-    }
-
-    @Override
-    public Set<PortAddresses> getAddressBindings() {
-        synchronized (portAddresses) {
-            return new HashSet<>(portAddresses.values());
-        }
-    }
-
-    @Override
-    public PortAddresses getAddressBindingsForPort(ConnectPoint connectPoint) {
-        PortAddresses addresses;
-
-        synchronized (portAddresses) {
-            addresses = portAddresses.get(connectPoint);
-        }
-
-        if (addresses == null) {
-            addresses = new PortAddresses(connectPoint, null, null);
-        }
-
-        return addresses;
-    }
-
-    // Auxiliary extension to allow location to mutate.
-    private class StoredHost extends DefaultHost {
-        private HostLocation location;
-
-        /**
-         * Creates an end-station host using the supplied information.
-         *
-         * @param providerId  provider identity
-         * @param id          host identifier
-         * @param mac         host MAC address
-         * @param vlan        host VLAN identifier
-         * @param location    host location
-         * @param ips         host IP addresses
-         * @param annotations optional key/value annotations
-         */
-        public StoredHost(ProviderId providerId, HostId id,
-                          MacAddress mac, VlanId vlan, HostLocation location,
-                          Set<IpPrefix> ips, Annotations... annotations) {
-            super(providerId, id, mac, vlan, location, ips, annotations);
-            this.location = location;
-        }
-
-        void setLocation(HostLocation location) {
-            this.location = location;
-        }
-
-        @Override
-        public HostLocation location() {
-            return location;
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index e20af88..a1d6c72 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -29,12 +29,18 @@
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.Timestamp;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.DistributedStoreSerializers;
+import org.onlab.onos.store.serializers.KryoSerializer;
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoPool;
 import org.slf4j.Logger;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -76,6 +82,17 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(DistributedStoreSerializers.COMMON)
+                    .register(InternalHostRemovedEvent.class)
+                    .build()
+                    .populate(1);
+        }
+    };
+
     @Activate
     public void activate() {
         log.info("Started");
@@ -90,8 +107,18 @@
     public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
                                         HostDescription hostDescription) {
         Timestamp timestamp = hostClockService.getTimestamp(hostId);
-        return createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
-        // TODO: tell peers.
+        HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
+        if (event != null) {
+            log.info("Notifying peers of a host topology event for providerId: "
+                    + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
+            try {
+                notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
+            } catch (IOException e) {
+                log.error("Failed to notify peers of a host topology event for providerId: "
+                        + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
+            }
+        }
+        return event;
     }
 
     private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
@@ -157,8 +184,16 @@
     @Override
     public HostEvent removeHost(HostId hostId) {
         Timestamp timestamp = hostClockService.getTimestamp(hostId);
-        return removeHostInternal(hostId, timestamp);
-        // TODO: tell peers
+        HostEvent event = removeHostInternal(hostId, timestamp);
+        if (event != null) {
+            log.info("Notifying peers of a host removed topology event for hostId: {}", hostId);
+            try {
+                notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
+            } catch (IOException e) {
+                log.info("Failed to notify peers of a host removed topology event for hostId: {}", hostId);
+            }
+        }
+        return event;
     }
 
     private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
@@ -341,4 +376,20 @@
             return location.value();
         }
     }
+
+    private void notifyPeers(InternalHostRemovedEvent event) throws IOException {
+        broadcastMessage(GossipHostStoreMessageSubjects.HOST_REMOVED, event);
+    }
+
+    private void notifyPeers(InternalHostEvent event) throws IOException {
+        broadcastMessage(GossipHostStoreMessageSubjects.HOST_UPDATED, event);
+    }
+
+    private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                subject,
+                SERIALIZER.encode(event));
+        clusterCommunicator.broadcast(message);
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
new file mode 100644
index 0000000..27cf4ce
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStoreMessageSubjects.java
@@ -0,0 +1,9 @@
+package org.onlab.onos.store.host.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+public final class GossipHostStoreMessageSubjects {
+    private GossipHostStoreMessageSubjects() {}
+    public static final MessageSubject HOST_UPDATED = new MessageSubject("peer-host-updated");
+    public static final MessageSubject HOST_REMOVED = new MessageSubject("peer-host-removed");
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostEvent.java
new file mode 100644
index 0000000..f5ca63e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostEvent.java
@@ -0,0 +1,51 @@
+package org.onlab.onos.store.host.impl;
+
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.host.HostDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipHostStore to notify peers of a host
+ * change (create/update) event.
+ */
+public class InternalHostEvent {
+
+    private final ProviderId providerId;
+    private final HostId hostId;
+    private final HostDescription hostDescription;
+    private final Timestamp timestamp;
+
+    public InternalHostEvent(ProviderId providerId, HostId hostId,
+                             HostDescription hostDescription, Timestamp timestamp) {
+        this.providerId = providerId;
+        this.hostId = hostId;
+        this.hostDescription = hostDescription;
+        this.timestamp = timestamp;
+    }
+
+    public ProviderId providerId() {
+        return providerId;
+    }
+
+    public HostId hostId() {
+        return hostId;
+    }
+
+    public HostDescription hostDescription() {
+        return hostDescription;
+    }
+
+    public Timestamp timestamp() {
+        return timestamp;
+    }
+
+    // Needed for serialization.
+    @SuppressWarnings("unused")
+    private InternalHostEvent() {
+        providerId = null;
+        hostId = null;
+        hostDescription = null;
+        timestamp = null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostRemovedEvent.java
new file mode 100644
index 0000000..8dd3b44
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/InternalHostRemovedEvent.java
@@ -0,0 +1,34 @@
+package org.onlab.onos.store.host.impl;
+
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipHostStore to notify peers of a host
+ * removed event.
+ */
+public class InternalHostRemovedEvent {
+
+    private final HostId hostId;
+    private final Timestamp timestamp;
+
+    public InternalHostRemovedEvent(HostId hostId, Timestamp timestamp) {
+        this.hostId = hostId;
+        this.timestamp = timestamp;
+    }
+
+    public HostId hostId() {
+        return hostId;
+    }
+
+    public Timestamp timestamp() {
+        return timestamp;
+    }
+
+    // for serialization.
+    @SuppressWarnings("unused")
+    private InternalHostRemovedEvent() {
+        hostId = null;
+        timestamp = null;
+    }
+}