Replaced GossipHostStore with a implementation built on top of EventuallyConsistentMap
Change-Id: I6b580727e5f4bb03e606c87a6748e6fbb90223e7
diff --git a/core/api/src/main/java/org/onosproject/net/host/HostClockService.java b/core/api/src/main/java/org/onosproject/net/host/HostClockService.java
deleted file mode 100644
index 1d88b87..0000000
--- a/core/api/src/main/java/org/onosproject/net/host/HostClockService.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.net.host;
-
-import org.onosproject.net.HostId;
-import org.onosproject.store.Timestamp;
-
-/**
- * Interface for a logical clock service that issues per host timestamps.
- */
-public interface HostClockService {
-
- /**
- * Returns a new timestamp for the specified host.
- * @param hostId identifier for the host.
- * @return timestamp.
- */
- Timestamp getTimestamp(HostId hostId);
-}
diff --git a/core/security/impl/src/main/java/org/onosproject/security/impl/PolicyBuilder.java b/core/security/impl/src/main/java/org/onosproject/security/impl/PolicyBuilder.java
index ddf720b..2d12633 100644
--- a/core/security/impl/src/main/java/org/onosproject/security/impl/PolicyBuilder.java
+++ b/core/security/impl/src/main/java/org/onosproject/security/impl/PolicyBuilder.java
@@ -27,7 +27,6 @@
import org.onosproject.net.group.GroupService;
import org.onosproject.net.host.HostAdminService;
import org.onosproject.net.host.HostService;
-import org.onosproject.net.host.HostClockService;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentExtensionService;
import org.onosproject.net.intent.IntentClockService;
@@ -136,8 +135,6 @@
new PermissionInfo(ServicePermission.class.getName(),
HostService.class.getName(), ServicePermission.GET),
new PermissionInfo(ServicePermission.class.getName(),
- HostClockService.class.getName(), ServicePermission.GET),
- new PermissionInfo(ServicePermission.class.getName(),
IntentService.class.getName(), ServicePermission.GET),
new PermissionInfo(ServicePermission.class.getName(),
IntentClockService.class.getName(), ServicePermission.GET),
@@ -209,8 +206,6 @@
GroupService.class.getName()));
serviceDirectory.put(Permission.GROUP_EVENT, ImmutableSet.of(
GroupService.class.getName()));
- serviceDirectory.put(Permission.HOST_READ, ImmutableSet.of(
- HostService.class.getName(), HostClockService.class.getName()));
serviceDirectory.put(Permission.HOST_WRITE, ImmutableSet.of(
HostService.class.getName()));
serviceDirectory.put(Permission.HOST_EVENT, ImmutableSet.of(
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
new file mode 100644
index 0000000..637444c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/ECHostStore.java
@@ -0,0 +1,263 @@
+package org.onosproject.store.host.impl;
+
+import static org.onosproject.net.DefaultAnnotations.merge;
+import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
+import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultHost;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.host.HostDescription;
+import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.host.HostStore;
+import org.onosproject.net.host.HostStoreDelegate;
+import org.onosproject.net.host.PortAddresses;
+import org.onosproject.net.host.HostEvent.Type;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.LogicalClockService;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import static com.google.common.collect.Multimaps.newSetMultimap;
+import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
+import static com.google.common.collect.Sets.newConcurrentHashSet;
+
+/**
+ * Manages the inventory of hosts using a {@code EventuallyConsistentMap}.
+ */
+@Component(immediate = true)
+@Service
+public class ECHostStore
+ extends AbstractStore<HostEvent, HostStoreDelegate>
+ implements HostStore {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LogicalClockService clockService;
+
+ // Hosts tracked by their location
+ private final Multimap<ConnectPoint, Host> locations
+ = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
+ () -> newConcurrentHashSet()));
+ private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
+ Multimaps.synchronizedSetMultimap(
+ HashMultimap.<ConnectPoint, PortAddresses>create());
+
+ private EventuallyConsistentMap<HostId, DefaultHost> hosts;
+
+ private EventuallyConsistentMapListener<HostId, DefaultHost> hostLocationTracker =
+ new HostLocationTracker();
+
+ @Activate
+ public void activate() {
+ KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API);
+
+ hosts = storageService.<HostId, DefaultHost>eventuallyConsistentMapBuilder()
+ .withName("onos-hosts")
+ .withSerializer(hostSerializer)
+ .withTimestampProvider((k, v) -> clockService.getTimestamp())
+ .build();
+
+ hosts.addListener(hostLocationTracker);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ hosts.removeListener(hostLocationTracker);
+ hosts.destroy();
+ locations.clear();
+ portAddresses.clear();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public HostEvent createOrUpdateHost(ProviderId providerId,
+ HostId hostId,
+ HostDescription hostDescription) {
+ DefaultHost currentHost = hosts.get(hostId);
+ if (currentHost == null) {
+ DefaultHost newhost = new DefaultHost(
+ providerId,
+ hostId,
+ hostDescription.hwAddress(),
+ hostDescription.vlan(),
+ hostDescription.location(),
+ ImmutableSet.copyOf(hostDescription.ipAddress()));
+ hosts.put(hostId, newhost);
+ return new HostEvent(HOST_ADDED, newhost);
+ }
+ return updateHost(providerId, hostId, hostDescription, currentHost);
+ }
+
+ @Override
+ public HostEvent removeHost(HostId hostId) {
+ Host host = hosts.remove(hostId);
+ return host != null ? new HostEvent(HOST_REMOVED, host) : null;
+ }
+
+ @Override
+ public int getHostCount() {
+ return hosts.size();
+ }
+
+ @Override
+ public Iterable<Host> getHosts() {
+ return ImmutableSet.copyOf(hosts.values());
+ }
+
+ @Override
+ public Host getHost(HostId hostId) {
+ return hosts.get(hostId);
+ }
+
+ @Override
+ public Set<Host> getHosts(VlanId vlanId) {
+ return filter(hosts.values(), host -> Objects.equals(host.vlan(), vlanId));
+ }
+
+ @Override
+ public Set<Host> getHosts(MacAddress mac) {
+ return filter(hosts.values(), host -> Objects.equals(host.mac(), mac));
+ }
+
+ @Override
+ public Set<Host> getHosts(IpAddress ip) {
+ return filter(hosts.values(), host -> host.ipAddresses().contains(ip));
+ }
+
+ @Override
+ public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
+ return ImmutableSet.copyOf(locations.get(connectPoint));
+ }
+
+ @Override
+ public Set<Host> getConnectedHosts(DeviceId deviceId) {
+ return locations.entries()
+ .stream()
+ .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+ .map(entry -> entry.getValue())
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public void updateAddressBindings(PortAddresses addresses) {
+ portAddresses.put(addresses.connectPoint(), addresses);
+ }
+
+ @Override
+ public void removeAddressBindings(PortAddresses addresses) {
+ portAddresses.remove(addresses.connectPoint(), addresses);
+ }
+
+ @Override
+ public void clearAddressBindings(ConnectPoint connectPoint) {
+ portAddresses.removeAll(connectPoint);
+ }
+
+ @Override
+ public Set<PortAddresses> getAddressBindings() {
+ return ImmutableSet.copyOf(portAddresses.values());
+ }
+
+ @Override
+ public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
+ synchronized (portAddresses) {
+ Set<PortAddresses> addresses = portAddresses.get(connectPoint);
+ return addresses == null ? Collections.emptySet() : ImmutableSet.copyOf(addresses);
+ }
+ }
+
+ private Set<Host> filter(Collection<DefaultHost> collection, Predicate<DefaultHost> predicate) {
+ return collection.stream().filter(predicate).collect(Collectors.toSet());
+ }
+
+ // checks for type of update to host, sends appropriate event
+ private HostEvent updateHost(ProviderId providerId,
+ HostId hostId,
+ HostDescription descr,
+ DefaultHost currentHost) {
+
+ final boolean hostMoved = !currentHost.location().equals(descr.location());
+ if (hostMoved ||
+ !currentHost.ipAddresses().containsAll(descr.ipAddress()) ||
+ !descr.annotations().keys().isEmpty()) {
+
+ Set<IpAddress> addresses = Sets.newHashSet(currentHost.ipAddresses());
+ addresses.addAll(descr.ipAddress());
+ Annotations annotations = merge((DefaultAnnotations) currentHost.annotations(),
+ descr.annotations());
+
+ DefaultHost updatedHost = new DefaultHost(providerId, currentHost.id(),
+ currentHost.mac(), currentHost.vlan(),
+ descr.location(),
+ addresses,
+ annotations);
+
+ // TODO: We need a way to detect conflicting changes and abort update.
+ hosts.put(hostId, updatedHost);
+ locations.remove(currentHost.location(), currentHost);
+ locations.put(updatedHost.location(), updatedHost);
+
+ HostEvent.Type eventType = hostMoved ? Type.HOST_MOVED : Type.HOST_UPDATED;
+ return new HostEvent(eventType, updatedHost);
+ }
+ return null;
+ }
+
+ private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> {
+
+ @Override
+ public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
+ DefaultHost host = event.value();
+ if (event.type() == PUT) {
+ locations.put(host.location(), host);
+ } else if (event.type() == REMOVE) {
+ locations.remove(host.location(), host);
+ }
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
deleted file mode 100644
index b2e4b97..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStore.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.host.impl;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.SetMultimap;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.packet.IpAddress;
-import org.onlab.packet.MacAddress;
-import org.onlab.packet.VlanId;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.net.Annotations;
-import org.onosproject.net.AnnotationsUtil;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.DefaultHost;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.Host;
-import org.onosproject.net.HostId;
-import org.onosproject.net.HostLocation;
-import org.onosproject.net.host.DefaultHostDescription;
-import org.onosproject.net.host.HostClockService;
-import org.onosproject.net.host.HostDescription;
-import org.onosproject.net.host.HostEvent;
-import org.onosproject.net.host.HostEvent.Type;
-import org.onosproject.net.host.HostStore;
-import org.onosproject.net.host.HostStoreDelegate;
-import org.onosproject.net.host.PortAddresses;
-import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.impl.Timestamped;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Multimaps.newSetMultimap;
-import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
-import static com.google.common.collect.Sets.newConcurrentHashSet;
-import static java.util.concurrent.Executors.newCachedThreadPool;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.minPriority;
-import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
-import static org.onosproject.net.DefaultAnnotations.merge;
-import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
-import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
-import static org.onosproject.store.host.impl.GossipHostStoreMessageSubjects.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of end-station hosts in distributed data store
- * that uses optimistic replication and gossip based techniques.
- */
-@Component(immediate = true)
-@Service
-public class GossipHostStore
- extends AbstractStore<HostEvent, HostStoreDelegate>
- implements HostStore {
-
- private final Logger log = getLogger(getClass());
-
- // TODO: make this configurable
- private int hostsExpected = 2000000;
-
- // Host inventory
- private final Map<HostId, StoredHost> hosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
-
- private final Map<HostId, Timestamped<Host>> removedHosts = new ConcurrentHashMap<>(hostsExpected, 0.75f, 16);
-
- // Hosts tracked by their location
- private final Multimap<ConnectPoint, Host> locations
- = synchronizedSetMultimap(newSetMultimap(new ConcurrentHashMap<>(),
- () -> newConcurrentHashSet()));
-
- private final SetMultimap<ConnectPoint, PortAddresses> portAddresses =
- Multimaps.synchronizedSetMultimap(
- HashMultimap.<ConnectPoint, PortAddresses>create());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected HostClockService hostClockService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.STORE_COMMON)
- .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
- .register(InternalHostEvent.class)
- .register(InternalHostRemovedEvent.class)
- .register(HostFragmentId.class)
- .register(HostAntiEntropyAdvertisement.class)
- .build();
- }
- };
-
- private ExecutorService executor;
-
- private ScheduledExecutorService backgroundExecutor;
-
- // TODO: Make these anti-entropy params configurable
- private long initialDelaySec = 5;
- private long periodSec = 5;
-
- @Activate
- public void activate() {
-
- executor = newCachedThreadPool(groupedThreads("onos/host", "fg-%d"));
-
- backgroundExecutor =
- newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/host", "bg-%d")));
-
- clusterCommunicator.addSubscriber(
- HOST_UPDATED_MSG,
- new InternalHostEventListener(), executor);
- clusterCommunicator.addSubscriber(
- HOST_REMOVED_MSG,
- new InternalHostRemovedEventListener(), executor);
- clusterCommunicator.addSubscriber(
- HOST_ANTI_ENTROPY_ADVERTISEMENT,
- new InternalHostAntiEntropyAdvertisementListener(), backgroundExecutor);
-
- // start anti-entropy thread
- backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
- initialDelaySec, periodSec, TimeUnit.SECONDS);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- executor.shutdownNow();
- backgroundExecutor.shutdownNow();
- try {
- if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
- log.error("Timeout during executor shutdown");
- }
- } catch (InterruptedException e) {
- log.error("Error during executor shutdown", e);
- }
-
- hosts.clear();
- removedHosts.clear();
- locations.clear();
- portAddresses.clear();
-
- log.info("Stopped");
- }
-
- @Override
- public HostEvent createOrUpdateHost(ProviderId providerId, HostId hostId,
- HostDescription hostDescription) {
- Timestamp timestamp = hostClockService.getTimestamp(hostId);
- HostEvent event = createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp);
- if (event != null) {
- log.debug("Notifying peers of a host topology event for providerId: "
- + "{}; hostId: {}; hostDescription: {}", providerId, hostId, hostDescription);
- notifyPeers(new InternalHostEvent(providerId, hostId, hostDescription, timestamp));
- }
- return event;
- }
-
- private HostEvent createOrUpdateHostInternal(ProviderId providerId, HostId hostId,
- HostDescription hostDescription, Timestamp timestamp) {
- // If this host was previously removed, first ensure
- // this new request is "newer"
- if (isHostRemoved(hostId, timestamp)) {
- log.debug("Ignoring update for removed host {}@{}",
- hostDescription, timestamp);
- return null;
- }
- StoredHost host = hosts.get(hostId);
- if (host == null) {
- return createHost(providerId, hostId, hostDescription, timestamp);
- }
- return updateHost(providerId, hostId, host, hostDescription, timestamp);
- }
-
- /**
- * @param hostId host identifier
- * @param timestamp timstamp to compare with
- * @return true if given timestamp is more recent timestamp compared to
- * the timestamp Host was removed.
- */
- private boolean isHostRemoved(HostId hostId, Timestamp timestamp) {
- Timestamped<Host> removedInfo = removedHosts.get(hostId);
- if (removedInfo != null) {
- if (removedInfo.isNewerThan(timestamp)) {
- return true;
- }
- removedHosts.remove(hostId, removedInfo);
- }
- return false;
- }
-
- // creates a new host and sends HOST_ADDED
- private HostEvent createHost(ProviderId providerId, HostId hostId,
- HostDescription descr, Timestamp timestamp) {
- synchronized (this) {
- StoredHost newhost = new StoredHost(timestamp, providerId, hostId,
- descr.hwAddress(),
- descr.vlan(),
- descr.location(),
- ImmutableSet.copyOf(descr.ipAddress()));
- StoredHost concAdd = hosts.putIfAbsent(hostId, newhost);
- if (concAdd != null) {
- // concurrent add detected, retry from start
- return updateHost(providerId, hostId, concAdd, descr, timestamp);
- }
- locations.put(descr.location(), newhost);
- return new HostEvent(HOST_ADDED, newhost);
- }
- }
-
- // checks for type of update to host, sends appropriate event
- private HostEvent updateHost(ProviderId providerId, HostId hostId, StoredHost oldHost,
- HostDescription descr, Timestamp timestamp) {
-
- if (timestamp.compareTo(oldHost.timestamp()) < 0) {
- // new timestamp is older
- log.debug("Ignoring outdated host update {}@{}", descr, timestamp);
- return null;
- }
-
- final boolean hostMoved = !oldHost.location().equals(descr.location());
- if (hostMoved ||
- !oldHost.ipAddresses().containsAll(descr.ipAddress()) ||
- !descr.annotations().keys().isEmpty()) {
-
- Set<IpAddress> addresses = new HashSet<>(oldHost.ipAddresses());
- addresses.addAll(descr.ipAddress());
- Annotations annotations = merge((DefaultAnnotations) oldHost.annotations(),
- descr.annotations());
-
- Timestamp newTimestamp = timestamp;
- // if merged Set/Annotation differ from description...
- final boolean deltaUpdate = !descr.ipAddress().equals(addresses) ||
- !AnnotationsUtil.isEqual(descr.annotations(), annotations);
- if (deltaUpdate) {
- // ..then local existing info had something description didn't
- newTimestamp = hostClockService.getTimestamp(hostId);
- log.debug("delta update detected on {}, substepping timestamp to {}",
- hostId, newTimestamp);
- }
-
- StoredHost updated = new StoredHost(newTimestamp,
- providerId, oldHost.id(),
- oldHost.mac(), oldHost.vlan(),
- descr.location(),
- addresses,
- annotations);
- synchronized (this) {
- boolean replaced = hosts.replace(hostId, oldHost, updated);
- if (!replaced) {
- // concurrent update, retry
- return createOrUpdateHostInternal(providerId, hostId, descr, timestamp);
- }
- locations.remove(oldHost.location(), oldHost);
- locations.put(updated.location(), updated);
-
- HostEvent.Type eventType;
- if (hostMoved) {
- eventType = Type.HOST_MOVED;
- } else {
- eventType = Type.HOST_UPDATED;
- }
- return new HostEvent(eventType, updated);
- }
- }
- return null;
- }
-
- @Override
- public HostEvent removeHost(HostId hostId) {
- Timestamp timestamp = hostClockService.getTimestamp(hostId);
- HostEvent event = removeHostInternal(hostId, timestamp);
- if (event != null) {
- log.debug("Notifying peers of a host removed topology event for hostId: {}", hostId);
- notifyPeers(new InternalHostRemovedEvent(hostId, timestamp));
- }
- return event;
- }
-
- private HostEvent removeHostInternal(HostId hostId, Timestamp timestamp) {
- synchronized (this) {
- Host host = hosts.remove(hostId);
- if (host != null) {
- locations.remove((host.location()), host);
- removedHosts.put(hostId, new Timestamped<>(host, timestamp));
- return new HostEvent(HOST_REMOVED, host);
- }
- return null;
- }
- }
-
- @Override
- public int getHostCount() {
- return hosts.size();
- }
-
- @Override
- public Iterable<Host> getHosts() {
- return ImmutableSet.<Host>copyOf(hosts.values());
- }
-
- @Override
- public Host getHost(HostId hostId) {
- return hosts.get(hostId);
- }
-
- @Override
- public Set<Host> getHosts(VlanId vlanId) {
- Set<Host> vlanset = new HashSet<>();
- for (Host h : hosts.values()) {
- if (h.vlan().equals(vlanId)) {
- vlanset.add(h);
- }
- }
- return vlanset;
- }
-
- @Override
- public Set<Host> getHosts(MacAddress mac) {
- Set<Host> macset = new HashSet<>();
- for (Host h : hosts.values()) {
- if (h.mac().equals(mac)) {
- macset.add(h);
- }
- }
- return macset;
- }
-
- @Override
- public Set<Host> getHosts(IpAddress ip) {
- Set<Host> ipset = new HashSet<>();
- for (Host h : hosts.values()) {
- if (h.ipAddresses().contains(ip)) {
- ipset.add(h);
- }
- }
- return ipset;
- }
-
- @Override
- public Set<Host> getConnectedHosts(ConnectPoint connectPoint) {
- return ImmutableSet.copyOf(locations.get(connectPoint));
- }
-
- @Override
- public Set<Host> getConnectedHosts(DeviceId deviceId) {
- Set<Host> hostset = new HashSet<>();
- for (ConnectPoint p : locations.keySet()) {
- if (p.deviceId().equals(deviceId)) {
- hostset.addAll(locations.get(p));
- }
- }
- return hostset;
- }
-
- @Override
- public void updateAddressBindings(PortAddresses addresses) {
- portAddresses.put(addresses.connectPoint(), addresses);
- }
-
- @Override
- public void removeAddressBindings(PortAddresses addresses) {
- portAddresses.remove(addresses.connectPoint(), addresses);
- }
-
- @Override
- public void clearAddressBindings(ConnectPoint connectPoint) {
- portAddresses.removeAll(connectPoint);
- }
-
- @Override
- public Set<PortAddresses> getAddressBindings() {
- synchronized (portAddresses) {
- return ImmutableSet.copyOf(portAddresses.values());
- }
- }
-
- @Override
- public Set<PortAddresses> getAddressBindingsForPort(ConnectPoint connectPoint) {
- synchronized (portAddresses) {
- Set<PortAddresses> addresses = portAddresses.get(connectPoint);
-
- if (addresses == null) {
- return Collections.emptySet();
- } else {
- return ImmutableSet.copyOf(addresses);
- }
- }
- }
-
- private static final class StoredHost extends DefaultHost {
- private final Timestamp timestamp;
-
- /**
- * Creates an end-station host using the supplied information.
- *
- * @param providerId provider identity
- * @param id host identifier
- * @param mac host MAC address
- * @param vlan host VLAN identifier
- * @param location host location
- * @param ips host IP addresses
- * @param annotations optional key/value annotations
- */
- public StoredHost(Timestamp timestamp, ProviderId providerId, HostId id,
- MacAddress mac, VlanId vlan, HostLocation location,
- Set<IpAddress> ips, Annotations... annotations) {
- super(providerId, id, mac, vlan, location, ips, annotations);
- this.timestamp = checkNotNull(timestamp);
- }
-
- public Timestamp timestamp() {
- return timestamp;
- }
- }
-
- private void notifyPeers(InternalHostRemovedEvent event) {
- broadcastMessage(HOST_REMOVED_MSG, event);
- }
-
- private void notifyPeers(InternalHostEvent event) {
- broadcastMessage(HOST_UPDATED_MSG, event);
- }
-
- private void broadcastMessage(MessageSubject subject, Object event) {
- clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
- }
-
- private void unicastMessage(NodeId peer,
- MessageSubject subject,
- Object event) throws IOException {
- clusterCommunicator.unicast(event, subject, SERIALIZER::encode, peer);
- }
-
- private void notifyDelegateIfNotNull(HostEvent event) {
- if (event != null) {
- notifyDelegate(event);
- }
- }
-
- private final class InternalHostEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
-
- log.debug("Received host update event from peer: {}", message.sender());
- InternalHostEvent event = SERIALIZER.decode(message.payload());
-
- ProviderId providerId = event.providerId();
- HostId hostId = event.hostId();
- HostDescription hostDescription = event.hostDescription();
- Timestamp timestamp = event.timestamp();
-
- try {
- notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
- hostId,
- hostDescription,
- timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling host removed", e);
- }
- }
- }
-
- private final class InternalHostRemovedEventListener
- implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
-
- log.debug("Received host removed event from peer: {}", message.sender());
- InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
-
- HostId hostId = event.hostId();
- Timestamp timestamp = event.timestamp();
-
- try {
- notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
- } catch (Exception e) {
- log.warn("Exception thrown handling host removed", e);
- }
- }
- }
-
- private final class SendAdvertisementTask implements Runnable {
-
- @Override
- public void run() {
- if (Thread.currentThread().isInterrupted()) {
- log.info("Interrupted, quitting");
- return;
- }
-
- try {
- final NodeId self = clusterService.getLocalNode().id();
- Set<ControllerNode> nodes = clusterService.getNodes();
-
- ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
- .transform(toNodeId())
- .toList();
-
- if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
- log.trace("No other peers in the cluster.");
- return;
- }
-
- NodeId peer;
- do {
- int idx = RandomUtils.nextInt(0, nodeIds.size());
- peer = nodeIds.get(idx);
- } while (peer.equals(self));
-
- HostAntiEntropyAdvertisement ad = createAdvertisement();
-
- if (Thread.currentThread().isInterrupted()) {
- log.info("Interrupted, quitting");
- return;
- }
-
- try {
- unicastMessage(peer, HOST_ANTI_ENTROPY_ADVERTISEMENT, ad);
- } catch (IOException e) {
- log.debug("Failed to send anti-entropy advertisement to {}", peer);
- return;
- }
- } catch (Exception e) {
- // catch all Exception to avoid Scheduled task being suppressed.
- log.error("Exception thrown while sending advertisement", e);
- }
- }
- }
-
- private HostAntiEntropyAdvertisement createAdvertisement() {
- final NodeId self = clusterService.getLocalNode().id();
-
- Map<HostFragmentId, Timestamp> timestamps = new HashMap<>(hosts.size());
- Map<HostId, Timestamp> tombstones = new HashMap<>(removedHosts.size());
-
- hosts.forEach((hostId, hostInfo) -> {
- final ProviderId providerId = hostInfo.providerId();
- timestamps.put(new HostFragmentId(hostId, providerId), hostInfo.timestamp());
- });
-
- removedHosts.forEach((hostId, timestamped) -> {
- tombstones.put(hostId, timestamped.timestamp());
- });
-
- return new HostAntiEntropyAdvertisement(self, timestamps, tombstones);
- }
-
- private synchronized void handleAntiEntropyAdvertisement(HostAntiEntropyAdvertisement ad) {
-
- final NodeId sender = ad.sender();
-
- for (Entry<HostId, StoredHost> host : hosts.entrySet()) {
- // for each locally live Hosts...
- final HostId hostId = host.getKey();
- final StoredHost localHost = host.getValue();
- final ProviderId providerId = localHost.providerId();
- final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
- final Timestamp localLiveTimestamp = localHost.timestamp();
-
- Timestamp remoteTimestamp = ad.timestamps().get(hostFragId);
- if (remoteTimestamp == null) {
- remoteTimestamp = ad.tombstones().get(hostId);
- }
- if (remoteTimestamp == null ||
- localLiveTimestamp.compareTo(remoteTimestamp) > 0) {
-
- // local is more recent, push
- // TODO: annotation is lost
- final HostDescription desc = new DefaultHostDescription(
- localHost.mac(),
- localHost.vlan(),
- localHost.location(),
- localHost.ipAddresses());
- try {
- unicastMessage(sender, HOST_UPDATED_MSG,
- new InternalHostEvent(providerId, hostId, desc, localHost.timestamp()));
- } catch (IOException e1) {
- log.debug("Failed to send advertisement response", e1);
- }
- }
-
- final Timestamp remoteDeadTimestamp = ad.tombstones().get(hostId);
- if (remoteDeadTimestamp != null &&
- remoteDeadTimestamp.compareTo(localLiveTimestamp) > 0) {
- // sender has recent remove
- notifyDelegateIfNotNull(removeHostInternal(hostId, remoteDeadTimestamp));
- }
- }
-
- for (Entry<HostId, Timestamped<Host>> dead : removedHosts.entrySet()) {
- // for each locally dead Hosts
- final HostId hostId = dead.getKey();
- final Timestamp localDeadTimestamp = dead.getValue().timestamp();
-
- // TODO: pick proper ProviderId, when supporting multi-provider
- final ProviderId providerId = dead.getValue().value().providerId();
- final HostFragmentId hostFragId = new HostFragmentId(hostId, providerId);
-
- final Timestamp remoteLiveTimestamp = ad.timestamps().get(hostFragId);
- if (remoteLiveTimestamp != null &&
- localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
- // sender has zombie, push
- try {
- unicastMessage(sender, HOST_REMOVED_MSG,
- new InternalHostRemovedEvent(hostId, localDeadTimestamp));
- } catch (IOException e1) {
- log.debug("Failed to send advertisement response", e1);
- }
- }
- }
-
- for (Entry<HostId, Timestamp> e : ad.tombstones().entrySet()) {
- // for each remote tombstone advertisement...
- final HostId hostId = e.getKey();
- final Timestamp adRemoveTimestamp = e.getValue();
-
- final StoredHost storedHost = hosts.get(hostId);
- if (storedHost == null) {
- continue;
- }
- if (adRemoveTimestamp.compareTo(storedHost.timestamp()) > 0) {
- // sender has recent remove info, locally remove
- notifyDelegateIfNotNull(removeHostInternal(hostId, adRemoveTimestamp));
- }
- }
-
- // if remote ad has something unknown, actively sync
- for (HostFragmentId key : ad.timestamps().keySet()) {
- if (!hosts.containsKey(key.hostId())) {
- HostAntiEntropyAdvertisement myAd = createAdvertisement();
- try {
- unicastMessage(sender, HOST_ANTI_ENTROPY_ADVERTISEMENT, myAd);
- break;
- } catch (IOException e) {
- log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
- }
- }
- }
- }
-
- private final class InternalHostAntiEntropyAdvertisementListener
- implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
- log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
- HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
- try {
- handleAntiEntropyAdvertisement(advertisement);
- } catch (Exception e) {
- log.warn("Exception thrown handling Host advertisements", e);
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java
deleted file mode 100644
index 549d0ce..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/GossipHostStoreMessageSubjects.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.host.impl;
-
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-public final class GossipHostStoreMessageSubjects {
- private GossipHostStoreMessageSubjects() {}
-
- public static final MessageSubject HOST_UPDATED_MSG
- = new MessageSubject("peer-host-updated");
- public static final MessageSubject HOST_REMOVED_MSG
- = new MessageSubject("peer-host-removed");
- public static final MessageSubject HOST_ANTI_ENTROPY_ADVERTISEMENT
- = new MessageSubject("host-enti-entropy-advertisement");;
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/HostAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/HostAntiEntropyAdvertisement.java
deleted file mode 100644
index 0df0990..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/HostAntiEntropyAdvertisement.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.host.impl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Map;
-
-import org.onosproject.cluster.NodeId;
-import org.onosproject.net.HostId;
-import org.onosproject.store.Timestamp;
-
-/**
- * Host AE Advertisement message.
- */
-public final class HostAntiEntropyAdvertisement {
-
- private final NodeId sender;
- private final Map<HostFragmentId, Timestamp> timestamps;
- private final Map<HostId, Timestamp> tombstones;
-
-
- public HostAntiEntropyAdvertisement(NodeId sender,
- Map<HostFragmentId, Timestamp> timestamps,
- Map<HostId, Timestamp> tombstones) {
- this.sender = checkNotNull(sender);
- this.timestamps = checkNotNull(timestamps);
- this.tombstones = checkNotNull(tombstones);
- }
-
- public NodeId sender() {
- return sender;
- }
-
- public Map<HostFragmentId, Timestamp> timestamps() {
- return timestamps;
- }
-
- public Map<HostId, Timestamp> tombstones() {
- return tombstones;
- }
-
- // For serializer
- @SuppressWarnings("unused")
- private HostAntiEntropyAdvertisement() {
- this.sender = null;
- this.timestamps = null;
- this.tombstones = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/HostClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/HostClockManager.java
deleted file mode 100644
index f96568b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/HostClockManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.host.impl;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.net.HostId;
-import org.onosproject.net.host.HostClockService;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.slf4j.Logger;
-
-/**
- * HostClockService to issue Timestamps based on local wallclock time.
- */
-@Component(immediate = true)
-@Service
-public class HostClockManager implements HostClockService {
-
- private final Logger log = getLogger(getClass());
-
- @Activate
- public void activate() {
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public Timestamp getTimestamp(HostId hostId) {
- return new WallClockTimestamp();
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/HostFragmentId.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/HostFragmentId.java
deleted file mode 100644
index 19810ed..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/HostFragmentId.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.host.impl;
-
-import java.util.Objects;
-
-import org.onosproject.net.HostId;
-import org.onosproject.net.provider.ProviderId;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Identifier for HostDescription from a Provider.
- */
-public final class HostFragmentId {
- public final ProviderId providerId;
- public final HostId hostId;
-
- public HostFragmentId(HostId hostId, ProviderId providerId) {
- this.providerId = providerId;
- this.hostId = hostId;
- }
-
- public HostId hostId() {
- return hostId;
- }
-
- public ProviderId providerId() {
- return providerId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(providerId, hostId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof HostFragmentId)) {
- return false;
- }
- HostFragmentId that = (HostFragmentId) obj;
- return Objects.equals(this.hostId, that.hostId) &&
- Objects.equals(this.providerId, that.providerId);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("providerId", providerId)
- .add("hostId", hostId)
- .toString();
- }
-
- // for serializer
- @SuppressWarnings("unused")
- private HostFragmentId() {
- this.providerId = null;
- this.hostId = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/InternalHostEvent.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/InternalHostEvent.java
deleted file mode 100644
index e7ba997..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/InternalHostEvent.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.host.impl;
-
-import org.onosproject.net.HostId;
-import org.onosproject.net.host.HostDescription;
-import org.onosproject.net.provider.ProviderId;
-import org.onosproject.store.Timestamp;
-
-/**
- * Information published by GossipHostStore to notify peers of a host
- * change (create/update) event.
- */
-public class InternalHostEvent {
-
- private final ProviderId providerId;
- private final HostId hostId;
- private final HostDescription hostDescription;
- private final Timestamp timestamp;
-
- public InternalHostEvent(ProviderId providerId, HostId hostId,
- HostDescription hostDescription, Timestamp timestamp) {
- this.providerId = providerId;
- this.hostId = hostId;
- this.hostDescription = hostDescription;
- this.timestamp = timestamp;
- }
-
- public ProviderId providerId() {
- return providerId;
- }
-
- public HostId hostId() {
- return hostId;
- }
-
- public HostDescription hostDescription() {
- return hostDescription;
- }
-
- public Timestamp timestamp() {
- return timestamp;
- }
-
- // Needed for serialization.
- @SuppressWarnings("unused")
- private InternalHostEvent() {
- providerId = null;
- hostId = null;
- hostDescription = null;
- timestamp = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/InternalHostRemovedEvent.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/InternalHostRemovedEvent.java
deleted file mode 100644
index a4a72da..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/InternalHostRemovedEvent.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.host.impl;
-
-import org.onosproject.net.HostId;
-import org.onosproject.store.Timestamp;
-
-/**
- * Information published by GossipHostStore to notify peers of a host
- * removed event.
- */
-public class InternalHostRemovedEvent {
-
- private final HostId hostId;
- private final Timestamp timestamp;
-
- public InternalHostRemovedEvent(HostId hostId, Timestamp timestamp) {
- this.hostId = hostId;
- this.timestamp = timestamp;
- }
-
- public HostId hostId() {
- return hostId;
- }
-
- public Timestamp timestamp() {
- return timestamp;
- }
-
- // for serialization.
- @SuppressWarnings("unused")
- private InternalHostRemovedEvent() {
- hostId = null;
- timestamp = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStoreMessageSubjects.java
deleted file mode 100644
index d6daeb9..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStoreMessageSubjects.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.intent.impl;
-
-import org.onosproject.store.cluster.messaging.MessageSubject;
-
-/**
- * Message subjects for internal gossip intent store node-to-node messages.
- */
-public final class GossipIntentStoreMessageSubjects {
- private GossipIntentStoreMessageSubjects() {}
-
- public static final MessageSubject INTENT_UPDATED_MSG
- = new MessageSubject("peer-intent-updated");
- public static final MessageSubject INTENT_SET_INSTALLABLES_MSG
- = new MessageSubject("peer-intent-set-installables");
- public static final MessageSubject INTENT_ANTI_ENTROPY_ADVERTISEMENT
- = new MessageSubject("intent-anti-entropy-advertisement");
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentClockManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentClockManager.java
deleted file mode 100644
index a0d7d2f..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentClockManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.intent.impl;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.net.intent.IntentClockService;
-import org.onosproject.net.intent.IntentId;
-import org.onosproject.store.Timestamp;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.slf4j.Logger;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * IntentClockService that issues timestamps based on local wallclock time.
- */
-@Component(immediate = true)
-@Service
-public class IntentClockManager implements IntentClockService {
-
- private final Logger log = getLogger(getClass());
-
- @Activate
- public void activate() {
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public Timestamp getTimestamp(IntentId intentId) {
- return new WallClockTimestamp();
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/InternalIntentEvent.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/InternalIntentEvent.java
deleted file mode 100644
index bb5c440..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/InternalIntentEvent.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.intent.impl;
-
-import org.onosproject.net.intent.Intent;
-import org.onosproject.net.intent.IntentId;
-import org.onosproject.net.intent.IntentState;
-import org.onosproject.store.Timestamp;
-
-/**
- * Information published by GossipIntentStore to notify peers of an intent
- * creation or state update event.
- */
-public class InternalIntentEvent {
-
- private final IntentId intentId;
- private final Intent intent;
- private final IntentState state;
- private final Timestamp timestamp;
-
- public InternalIntentEvent(IntentId intentId, Intent intent, IntentState state,
- Timestamp timestamp) {
- this.intentId = intentId;
- this.intent = intent;
- this.state = state;
- this.timestamp = timestamp;
- }
-
- public IntentId intentId() {
- return intentId;
- }
-
- public Intent intent() {
- return intent;
- }
-
- public IntentState state() {
- return state;
- }
-
- public Timestamp timestamp() {
- return timestamp;
- }
-
- // Needed for serialization.
- @SuppressWarnings("unused")
- private InternalIntentEvent() {
- intentId = null;
- intent = null;
- state = null;
- timestamp = null;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/InternalSetInstallablesEvent.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/InternalSetInstallablesEvent.java
deleted file mode 100644
index e0e783b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/InternalSetInstallablesEvent.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.intent.impl;
-
-import org.onosproject.net.intent.Intent;
-import org.onosproject.net.intent.IntentId;
-import org.onosproject.store.Timestamp;
-
-import java.util.List;
-
-/**
- * Information published by GossipIntentStore to notify peers of an intent
- * set installables event.
- */
-public class InternalSetInstallablesEvent {
-
- private final IntentId intentId;
- private final List<Intent> installables;
- private final Timestamp timestamp;
-
- public InternalSetInstallablesEvent(IntentId intentId,
- List<Intent> installables,
- Timestamp timestamp) {
- this.intentId = intentId;
- this.installables = installables;
- this.timestamp = timestamp;
- }
-
- public IntentId intentId() {
- return intentId;
- }
-
- public List<Intent> installables() {
- return installables;
- }
-
- public Timestamp timestamp() {
- return timestamp;
- }
-
- // Needed for serialization.
- @SuppressWarnings("unused")
- private InternalSetInstallablesEvent() {
- intentId = null;
- installables = null;
- timestamp = null;
- }
-}
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 8996e3b..bf67aa3 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -51,6 +51,7 @@
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.DefaultEdgeLink;
+import org.onosproject.net.DefaultHost;
import org.onosproject.net.DefaultLink;
import org.onosproject.net.DefaultPath;
import org.onosproject.net.DefaultPort;
@@ -267,6 +268,7 @@
DefaultControllerNode.class,
DefaultDevice.class,
DefaultDeviceDescription.class,
+ DefaultHost.class,
DefaultLinkDescription.class,
Port.class,
DefaultPortDescription.class,