Implement host probing retry with major refactoring
- Implement probe retry
- Switch to typical core/provider design pattern for HostProbingService
and as a result decoupling the dependency between SR and HostLocationProvider
Change-Id: I33a15af580677ea376b421ac3e26f9821dcca844
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DefaultHostProbe.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DefaultHostProbe.java
new file mode 100644
index 0000000..5ab72fc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DefaultHostProbe.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.host.impl;
+
+import org.onlab.packet.MacAddress;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultHost;
+import org.onosproject.net.Host;
+import org.onosproject.net.host.ProbeMode;
+import org.onosproject.net.host.HostProbe;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Internal data structure to record the info of a host with location that is under verification.
+ */
+class DefaultHostProbe extends DefaultHost implements HostProbe {
+ private ConnectPoint connectPoint;
+ private int retry;
+ private ProbeMode mode;
+ private MacAddress probeMac;
+
+ /**
+ * Constructs DefaultHostProbe with given retry.
+ *
+ * @param host host to be probed
+ * @param connectPoint location to be verified
+ * @param probeMac source MAC address of the probe
+ * @param mode probe mode
+ * @param retry number of retry
+ */
+ DefaultHostProbe(Host host, ConnectPoint connectPoint, ProbeMode mode, MacAddress probeMac, int retry) {
+ super(host.providerId(), host.id(), host.mac(), host.vlan(), host.locations(), host.ipAddresses(),
+ host.configured());
+
+ this.connectPoint = connectPoint;
+ this.mode = mode;
+ this.probeMac = probeMac;
+ this.retry = retry;
+ }
+
+ @Override
+ public ConnectPoint connectPoint() {
+ return connectPoint;
+ }
+
+ @Override
+ public int retry() {
+ return retry;
+ }
+
+ @Override
+ public void decreaseRetry() {
+ this.retry -= 1;
+ }
+
+ @Override
+ public ProbeMode mode() {
+ return mode;
+ }
+
+ @Override
+ public MacAddress probeMac() {
+ return probeMac;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DefaultHostProbe)) {
+ return false;
+ }
+ DefaultHostProbe that = (DefaultHostProbe) o;
+ return (super.equals(o) &&
+ Objects.equals(this.connectPoint, that.connectPoint) &&
+ Objects.equals(this.retry, that.retry) &&
+ Objects.equals(this.mode, that.mode)) &&
+ Objects.equals(this.probeMac, that.probeMac);
+ }
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), connectPoint, retry, mode, probeMac);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(getClass())
+ .add("host", super.toString())
+ .add("location", connectPoint)
+ .add("retry", retry)
+ .add("mode", mode)
+ .add("probeMac", probeMac)
+ .toString();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DefaultHostProbeStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DefaultHostProbeStore.java
new file mode 100644
index 0000000..33cb7ad
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DefaultHostProbeStore.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.host.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.MacAddress;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Host;
+import org.onosproject.net.host.HostProbe;
+import org.onosproject.net.host.HostProbeStore;
+import org.onosproject.net.host.HostProbingEvent;
+import org.onosproject.net.host.ProbeMode;
+import org.onosproject.net.host.HostProbingStoreDelegate;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(immediate = true)
+@Service
+public class DefaultHostProbeStore extends AbstractStore<HostProbingEvent, HostProbingStoreDelegate>
+ implements HostProbeStore {
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ private final Logger log = getLogger(getClass());
+
+ // TODO make this configurable
+ private static final int PROBE_TIMEOUT_MS = 3000;
+
+ private AtomicCounter hostProbeIndex;
+ private Cache<MacAddress, HostProbe> probingHostsCache;
+ private ConsistentMap<MacAddress, HostProbe> probingHostsConsistentMap;
+ private Map<MacAddress, HostProbe> probingHosts;
+ private MapEventListener<MacAddress, HostProbe> probingHostListener = new ProbingHostListener();
+ private ScheduledExecutorService cacheCleaner;
+ private ScheduledExecutorService locationRemover;
+
+ @Activate
+ public void activate() {
+ KryoNamespace.Builder pendingHostSerializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(DefaultHostProbe.class)
+ .register(ProbeMode.class);
+ probingHostsConsistentMap = storageService.<MacAddress, HostProbe>consistentMapBuilder()
+ .withName("onos-hosts-pending")
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(pendingHostSerializer.build()))
+ .build();
+ probingHostsConsistentMap.addListener(probingHostListener);
+ probingHosts = probingHostsConsistentMap.asJavaMap();
+
+ hostProbeIndex = storageService.atomicCounterBuilder()
+ .withName("onos-hosts-probe-index")
+ .build()
+ .asAtomicCounter();
+
+ probingHostsCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .removalListener((RemovalNotification<MacAddress, HostProbe> notification) -> {
+ MacAddress probeMac = notification.getKey();
+ switch (notification.getCause()) {
+ case EXPIRED:
+ case REPLACED:
+ probingHosts.computeIfPresent(probeMac, (k, v) -> {
+ v.decreaseRetry();
+ return v;
+ });
+ break;
+ case EXPLICIT:
+ break;
+ default:
+ log.warn("Remove {} from pendingHostLocations for unexpected reason {}",
+ notification.getKey(), notification.getCause());
+ }
+ }).build();
+
+ cacheCleaner = newSingleThreadScheduledExecutor(
+ groupedThreads("onos/host/hostprobestore", "cache-cleaner", log));
+ cacheCleaner.scheduleAtFixedRate(probingHostsCache::cleanUp, 0, PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ locationRemover = newSingleThreadScheduledExecutor(
+ groupedThreads("onos/host/hostprobestore", "loc-remover", log));
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ cacheCleaner.shutdown();
+ locationRemover.shutdown();
+ probingHostsCache.cleanUp();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public MacAddress addProbingHost(Host host, ConnectPoint connectPoint, ProbeMode probeMode,
+ MacAddress probeMac, int retry) {
+ if (probeMac == null) {
+ probeMac = generateProbeMac();
+ }
+ DefaultHostProbe probingHost = new DefaultHostProbe(host, connectPoint, probeMode, probeMac, retry);
+ probingHostsCache.put(probeMac, probingHost);
+ probingHosts.put(probeMac, probingHost);
+ return probeMac;
+ }
+
+ @Override
+ public void removeProbingHost(MacAddress probeMac) {
+ probingHostsCache.invalidate(probeMac);
+ probingHosts.remove(probeMac);
+ }
+
+ private MacAddress generateProbeMac() {
+ // Use ONLab OUI (3 bytes) + atomic counter (3 bytes) as the source MAC of the probe
+ long nextIndex = hostProbeIndex.incrementAndGet();
+ return MacAddress.valueOf(MacAddress.NONE.toLong() + nextIndex);
+ }
+
+ private class ProbingHostListener implements MapEventListener<MacAddress, HostProbe> {
+ @Override
+ public void event(MapEvent<MacAddress, HostProbe> event) {
+ HostProbe newValue = Versioned.valueOrNull(event.newValue());
+ HostProbe oldValue = Versioned.valueOrNull(event.oldValue());
+
+ HostProbingEvent hostProbingEvent;
+ switch (event.type()) {
+ case INSERT:
+ hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_REQUESTED, newValue);
+ notifyDelegate(hostProbingEvent);
+ break;
+ case UPDATE:
+ // Fail VERIFY probe immediately. Only allow DISCOVER probe to retry.
+ if (newValue.retry() > 0) {
+ if (newValue.mode() == ProbeMode.DISCOVER) {
+ hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_TIMEOUT,
+ newValue, oldValue);
+ notifyDelegate(hostProbingEvent);
+ } else {
+ hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_FAIL,
+ newValue, oldValue);
+ notifyDelegate(hostProbingEvent);
+ }
+ } else {
+ // Remove from pendingHost and let the remove listener generates the event
+ locationRemover.execute(() -> probingHosts.remove(event.key()));
+ }
+ break;
+ case REMOVE:
+ if (oldValue.retry() > 0) {
+ hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_COMPLETED, oldValue);
+ notifyDelegate(hostProbingEvent);
+ } else {
+ hostProbingEvent = new HostProbingEvent(HostProbingEvent.Type.PROBE_FAIL, oldValue);
+ notifyDelegate(hostProbingEvent);
+ }
+ break;
+ default:
+ log.warn("Unknown map event type: {}", event.type());
+ }
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
index aa811ba..d179288 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/host/impl/DistributedHostStore.java
@@ -15,9 +15,6 @@
*/
package org.onosproject.store.host.impl;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
@@ -43,18 +40,15 @@
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostStore;
import org.onosproject.net.host.HostStoreDelegate;
-import org.onosproject.net.host.HostLocationProbingService.ProbeMode;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.DistributedPrimitive.Status;
-import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.Collection;
@@ -62,11 +56,9 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -93,47 +85,16 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
- private AtomicCounter hostProbeIndex;
private ConsistentMap<HostId, DefaultHost> hostsConsistentMap;
private Map<HostId, DefaultHost> hosts;
private Map<IpAddress, Set<Host>> hostsByIp;
private MapEventListener<HostId, DefaultHost> hostLocationTracker =
new HostLocationTracker();
- private ConsistentMap<MacAddress, PendingHostLocation> pendingHostsConsistentMap;
- private Map<MacAddress, PendingHostLocation> pendingHosts;
- private MapEventListener<MacAddress, PendingHostLocation> pendingHostListener =
- new PendingHostListener();
-
private ScheduledExecutorService executor;
- private ScheduledExecutorService cacheCleaner;
- private ScheduledExecutorService locationRemover;
private Consumer<Status> statusChangeListener;
- // TODO make this configurable
- private static final int PROBE_TIMEOUT_MS = 3000;
-
- private Cache<MacAddress, PendingHostLocation> pendingHostsCache = CacheBuilder.newBuilder()
- .expireAfterWrite(PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
- .removalListener((RemovalNotification<MacAddress, PendingHostLocation> notification) -> {
- switch (notification.getCause()) {
- case EXPIRED:
- PendingHostLocation expired = notification.getValue();
- if (expired != null) {
- if (timeoutPendingHostLocation(notification.getKey())) {
- log.info("Evict {} from pendingHosts due to probe timeout", notification.getValue());
- }
- }
- break;
- case EXPLICIT:
- break;
- default:
- log.warn("Remove {} from pendingHostLocations for unexpected reason {}",
- notification.getKey(), notification.getCause());
- }
- }).build();
-
@Activate
public void activate() {
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
@@ -146,28 +107,6 @@
hostsConsistentMap.addListener(hostLocationTracker);
hosts = hostsConsistentMap.asJavaMap();
- KryoNamespace.Builder pendingHostSerializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(PendingHostLocation.class)
- .register(ProbeMode.class);
- pendingHostsConsistentMap = storageService.<MacAddress, PendingHostLocation>consistentMapBuilder()
- .withName("onos-hosts-pending")
- .withRelaxedReadConsistency()
- .withSerializer(Serializer.using(pendingHostSerializer.build()))
- .build();
- pendingHostsConsistentMap.addListener(pendingHostListener);
- pendingHosts = pendingHostsConsistentMap.asJavaMap();
-
- hostProbeIndex = storageService.atomicCounterBuilder()
- .withName("onos-hosts-probe-index")
- .build()
- .asAtomicCounter();
-
- cacheCleaner = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "cache-cleaner", log));
- cacheCleaner.scheduleAtFixedRate(pendingHostsCache::cleanUp, 0, PROBE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-
- locationRemover = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "loc-remover", log));
-
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/hosts", "status-listener", log));
statusChangeListener = status -> {
if (status == Status.ACTIVE) {
@@ -182,9 +121,6 @@
@Deactivate
public void deactivate() {
hostsConsistentMap.removeListener(hostLocationTracker);
-
- cacheCleaner.shutdown();
- locationRemover.shutdown();
executor.shutdown();
log.info("Stopped");
@@ -425,41 +361,6 @@
return ImmutableSet.copyOf(filtered);
}
- @Override
- public MacAddress addPendingHostLocation(HostId hostId, ConnectPoint connectPoint, ProbeMode probeMode) {
- // Use ONLab OUI (3 bytes) + atomic counter (3 bytes) as the source MAC of the probe
- long nextIndex = hostProbeIndex.getAndIncrement();
- MacAddress probeMac = MacAddress.valueOf(MacAddress.NONE.toLong() + nextIndex);
- PendingHostLocation phl = new PendingHostLocation(hostId, connectPoint, probeMode);
-
- pendingHostsCache.put(probeMac, phl);
- pendingHosts.put(probeMac, phl);
-
- return probeMac;
- }
-
- @Override
- public void removePendingHostLocation(MacAddress probeMac) {
- // Add the host location if probe replied in-time in DISCOVER mode
- Optional.ofNullable(pendingHosts.get(probeMac)).ifPresent(phl -> {
- if (phl.probeMode() == ProbeMode.DISCOVER) {
- HostLocation newLocation = new HostLocation(phl.connectPoint(), System.currentTimeMillis());
- appendLocation(phl.hostId(), newLocation);
- }
- });
-
- pendingHostsCache.invalidate(probeMac);
- pendingHosts.remove(probeMac);
- }
-
- private boolean timeoutPendingHostLocation(MacAddress probeMac) {
- PendingHostLocation phl = pendingHosts.computeIfPresent(probeMac, (k, v) -> {
- v.setExpired(true);
- return v;
- });
- return phl != null;
- }
-
private Set<Host> filter(Collection<DefaultHost> collection, Predicate<DefaultHost> predicate) {
return collection.stream().filter(predicate).collect(Collectors.toSet());
}
@@ -543,29 +444,4 @@
}
}
}
-
- private class PendingHostListener implements MapEventListener<MacAddress, PendingHostLocation> {
- @Override
- public void event(MapEvent<MacAddress, PendingHostLocation> event) {
- Versioned<PendingHostLocation> newValue = event.newValue();
- switch (event.type()) {
- case INSERT:
- break;
- case UPDATE:
- // Remove the host location if probe timeout in VERIFY mode
- if (newValue.value().expired() && newValue.value().probeMode() == ProbeMode.VERIFY) {
- locationRemover.execute(() -> {
- pendingHosts.remove(event.key());
- removeLocation(newValue.value().hostId(),
- new HostLocation(newValue.value().connectPoint(), 0L));
- });
- }
- break;
- case REMOVE:
- break;
- default:
- log.warn("Unknown map event type: {}", event.type());
- }
- }
- }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/host/impl/PendingHostLocation.java b/core/store/dist/src/main/java/org/onosproject/store/host/impl/PendingHostLocation.java
deleted file mode 100644
index dc10507..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/host/impl/PendingHostLocation.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.host.impl;
-
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.HostId;
-import org.onosproject.net.host.HostLocationProbingService.ProbeMode;
-
-import java.util.Objects;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-/**
- * Internal data structure to record the info of a host with location that is under verification.
- */
-class PendingHostLocation {
- private HostId hostId;
- private ConnectPoint connectPoint;
- private boolean expired;
- private ProbeMode probeMode;
-
- /**
- * Constructs PendingHostLocation.
- *
- * @param hostId Host ID
- * @param connectPoint location to be verified
- * @param probeMode probe mode
- */
- PendingHostLocation(HostId hostId, ConnectPoint connectPoint, ProbeMode probeMode) {
- this.hostId = hostId;
- this.connectPoint = connectPoint;
- this.expired = false;
- this.probeMode = probeMode;
- }
-
- /**
- * Gets HostId of this entry.
- *
- * @return host id
- */
- HostId hostId() {
- return hostId;
- }
-
- /**
- * Gets connect point of this entry.
- *
- * @return connect point
- */
- ConnectPoint connectPoint() {
- return connectPoint;
- }
-
- /**
- * Determine whether this probe is expired or not.
- *
- * @return true if this entry is expired and waiting to be removed from the cache
- */
- boolean expired() {
- return expired;
- }
-
- /**
- * Sets whether this probe is expired or not.
- *
- * @param expired true if this entry is expired and waiting to be removed from the cache
- */
- void setExpired(boolean expired) {
- this.expired = expired;
- }
-
- /**
- * Gets probe mode of this entry.
- *
- * @return probe mode
- */
- ProbeMode probeMode() {
- return probeMode;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof PendingHostLocation)) {
- return false;
- }
- PendingHostLocation that = (PendingHostLocation) o;
- return (Objects.equals(this.hostId, that.hostId) &&
- Objects.equals(this.connectPoint, that.connectPoint) &&
- Objects.equals(this.expired, that.expired) &&
- Objects.equals(this.probeMode, that.probeMode));
- }
- @Override
- public int hashCode() {
- return Objects.hash(hostId, connectPoint, expired, probeMode);
- }
-
- @Override
- public String toString() {
- return toStringHelper(getClass())
- .add("hostId", hostId)
- .add("location", connectPoint)
- .add("expired", expired)
- .add("probeMode", probeMode)
- .toString();
- }
-}