Updating IntentCleanup to check for stalled *_REQ and *ING intents.
Change-Id: Ibe06ee99463bb8230acf9751da4fb1012859b0ea
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
index 7950949..0c3e649 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
@@ -43,10 +43,11 @@
* Returns an iterable of all intent data objects in the store.
*
* @param localOnly should only intents for which this instance is master
- * should be returned
+ * be returned
+ * @param olderThan specified duration in milliseconds (0 for "now")
* @return iterable of all intent data objects
*/
- Iterable<IntentData> getIntentData(boolean localOnly);
+ Iterable<IntentData> getIntentData(boolean localOnly, long olderThan);
/**
* Returns the state of the specified intent.
@@ -119,4 +120,22 @@
* @return pending intents
*/
Iterable<Intent> getPending();
+
+ /**
+ * Returns the intent data objects that are pending processing.
+ *
+ * @return pending intent data objects
+ */
+ Iterable<IntentData> getPendingData();
+
+ /**
+ * Returns the intent data objects that are pending processing for longer
+ * than the specified duration.
+ *
+ * @param localOnly should only intents for which this instance is master
+ * be returned
+ * @param olderThan specified duration in milliseconds (0 for "now")
+ * @return pending intent data objects
+ */
+ Iterable<IntentData> getPendingData(boolean localOnly, long olderThan);
}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
index cafa998..de57781 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
@@ -28,6 +28,7 @@
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentStore;
+import org.onosproject.net.intent.Key;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -41,12 +42,16 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.intent.IntentState.CORRUPT;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * FIXME Class to cleanup Intents in CORRUPT state.
- * FIXME move this to its own file eventually (but need executor for now)
+ * This component cleans up intents that have encountered errors or otherwise
+ * stalled during installation or withdrawal.
+ * <p>
+ * It periodically polls (based on configured period) for pending and CORRUPT
+ * intents from the store and retries. It also listens for CORRUPT event
+ * notifications, which signify errors in processing, and retries.
+ * </p>
*/
@Component(immediate = true)
public class IntentCleanup implements Runnable, IntentListener {
@@ -58,6 +63,7 @@
@Property(name = "period", intValue = DEFAULT_PERIOD,
label = "Frequency in ms between cleanup runs")
protected int period = DEFAULT_PERIOD;
+ private long periodMs;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService service;
@@ -126,7 +132,7 @@
}
};
- long periodMs = period * 1000; //convert to ms
+ periodMs = period * 1_000; //convert to ms
timer.scheduleAtFixedRate(timerTask, periodMs, periodMs);
}
@@ -140,41 +146,79 @@
}
}
+ private void resubmitCorrupt(IntentData intentData, boolean checkThreshold) {
+ //TODO we might want to give up when retry count exceeds a threshold
+ // FIXME drop this if we exceed retry threshold
+
+
+ switch (intentData.request()) {
+ case INSTALL_REQ:
+ service.submit(intentData.intent());
+ break;
+ case WITHDRAW_REQ:
+ service.withdraw(intentData.intent());
+ break;
+ default:
+ //TODO this is an error, might want to log it
+ break;
+ }
+ }
+
+ private void resubmitPendingRequest(IntentData intentData) {
+ switch (intentData.request()) {
+ case INSTALL_REQ:
+ service.submit(intentData.intent());
+ break;
+ case WITHDRAW_REQ:
+ service.withdraw(intentData.intent());
+ break;
+ default:
+ //TODO this is an error (or could be purge), might want to log it
+ break;
+ }
+ }
+
/**
- * Iterate through CORRUPT intents and re-submit/withdraw.
+ * Iterate through CORRUPT intents and re-submit/withdraw appropriately.
*
- * FIXME we want to eventually count number of retries per intent and give up
- * FIXME we probably also want to look at intents that have been stuck
- * in *_REQ or *ING for "too long".
*/
private void cleanup() {
- int count = 0;
- for (IntentData intentData : store.getIntentData(true)) {
- if (intentData.state() == CORRUPT) {
- switch (intentData.request()) {
- case INSTALL_REQ:
- service.submit(intentData.intent());
- count++;
- break;
- case WITHDRAW_REQ:
- service.withdraw(intentData.intent());
- count++;
- break;
- default:
- //TODO this is an error
- break;
- }
+ int corruptCount = 0, stuckCount = 0, pendingCount = 0;
+ for (IntentData intentData : store.getIntentData(true, periodMs)) {
+ switch (intentData.state()) {
+ case CORRUPT:
+ resubmitCorrupt(intentData, false);
+ corruptCount++;
+ case INSTALLING: //FALLTHROUGH
+ case WITHDRAWING:
+ resubmitPendingRequest(intentData);
+ stuckCount++;
+ default:
+ //NOOP
+ break;
}
}
- log.debug("Intent cleanup ran and resubmitted {} intents", count);
+
+ for (IntentData intentData : store.getPendingData(true, periodMs)) {
+ //TODO should we do age check here, or in the store?
+ resubmitPendingRequest(intentData);
+ stuckCount++;
+ }
+
+ log.debug("Intent cleanup ran and resubmitted {} corrupt, {} stuck, and {} pending intents",
+ corruptCount, stuckCount, pendingCount);
}
@Override
public void event(IntentEvent event) {
+ // fast path for CORRUPT intents, retry on event notification
+ //TODO we might consider using the timer to back off for subsequent retries
if (event.type() == IntentEvent.Type.CORRUPT) {
- // FIXME drop this if we exceed retry threshold
- // just run the whole cleanup script for now
- executor.submit(this);
+ Key key = event.subject().key();
+ if (store.isMaster(key)) {
+ IntentData data = store.getIntentData(event.subject().key());
+ resubmitCorrupt(data, true);
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/WallClockTimestamp.java b/core/store/dist/src/main/java/org/onosproject/store/impl/WallClockTimestamp.java
index a158928..e7efb29 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/WallClockTimestamp.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/WallClockTimestamp.java
@@ -36,6 +36,10 @@
unixTimestamp = System.currentTimeMillis();
}
+ public WallClockTimestamp(long timestamp) {
+ unixTimestamp = timestamp;
+ }
+
@Override
public int compareTo(Timestamp o) {
checkArgument(o instanceof WallClockTimestamp,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 4f0ad66..7de2d17 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -132,10 +132,13 @@
}
@Override
- public Iterable<IntentData> getIntentData(boolean localOnly) {
- if (localOnly) {
+ public Iterable<IntentData> getIntentData(boolean localOnly, long olderThan) {
+ if (localOnly || olderThan > 0) {
+ long now = System.currentTimeMillis();
+ final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
return currentMap.values().stream()
- .filter(data -> isMaster(data.key()))
+ .filter(data -> data.version().isOlderThan(time) &&
+ (!localOnly || isMaster(data.key())))
.collect(Collectors.toList());
}
return currentMap.values();
@@ -261,6 +264,21 @@
.collect(Collectors.toList());
}
+ @Override
+ public Iterable<IntentData> getPendingData() {
+ return pendingMap.values();
+ }
+
+ @Override
+ public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
+ long now = System.currentTimeMillis();
+ final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
+ return pendingMap.values().stream()
+ .filter(data -> data.version().isOlderThan(time) &&
+ (!localOnly || isMaster(data.key())))
+ .collect(Collectors.toList());
+ }
+
private void notifyDelegateIfNotNull(IntentEvent event) {
if (event != null) {
notifyDelegate(event);
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java
index 186fdd9..72a5dce 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java
@@ -36,7 +36,7 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.net.intent.IntentState.*;
+import static org.onosproject.net.intent.IntentState.PURGE_REQ;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -76,11 +76,15 @@
}
@Override
- public Iterable<IntentData> getIntentData(boolean localOnly) {
- if (localOnly) {
- return current.values().stream()
- .filter(data -> isMaster(data.key()))
+ public Iterable<IntentData> getIntentData(boolean localOnly, long olderThan) {
+ if (localOnly || olderThan > 0) {
+ long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns
+ final SystemClockTimestamp time = new SystemClockTimestamp(older);
+ return pending.values().stream()
+ .filter(data -> data.version().isOlderThan(time) &&
+ (!localOnly || isMaster(data.key())))
.collect(Collectors.toList());
+
}
return Lists.newArrayList(current.values());
}
@@ -191,4 +195,19 @@
.map(IntentData::intent)
.collect(Collectors.toList());
}
+
+ @Override
+ public Iterable<IntentData> getPendingData() {
+ return Lists.newArrayList(pending.values());
+ }
+
+ @Override
+ public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
+ long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns
+ final SystemClockTimestamp time = new SystemClockTimestamp(older);
+ return pending.values().stream()
+ .filter(data -> data.version().isOlderThan(time) &&
+ (!localOnly || isMaster(data.key())))
+ .collect(Collectors.toList());
+ }
}
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SystemClockTimestamp.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SystemClockTimestamp.java
index d79797e..1e98a7a 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SystemClockTimestamp.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SystemClockTimestamp.java
@@ -29,10 +29,14 @@
*/
public class SystemClockTimestamp implements Timestamp {
- private final long unixTimestamp;
+ private final long nanoTimestamp;
public SystemClockTimestamp() {
- unixTimestamp = System.nanoTime();
+ nanoTimestamp = System.nanoTime();
+ }
+
+ public SystemClockTimestamp(long timestamp) {
+ nanoTimestamp = timestamp;
}
@Override
@@ -42,12 +46,12 @@
SystemClockTimestamp that = (SystemClockTimestamp) o;
return ComparisonChain.start()
- .compare(this.unixTimestamp, that.unixTimestamp)
+ .compare(this.nanoTimestamp, that.nanoTimestamp)
.result();
}
@Override
public int hashCode() {
- return Objects.hash(unixTimestamp);
+ return Objects.hash(nanoTimestamp);
}
@Override
@@ -59,17 +63,21 @@
return false;
}
SystemClockTimestamp that = (SystemClockTimestamp) obj;
- return Objects.equals(this.unixTimestamp, that.unixTimestamp);
+ return Objects.equals(this.nanoTimestamp, that.nanoTimestamp);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("unixTimestamp", unixTimestamp)
+ .add("nanoTimestamp", nanoTimestamp)
.toString();
}
+ public long nanoTimestamp() {
+ return nanoTimestamp;
+ }
+
public long systemTimestamp() {
- return unixTimestamp;
+ return nanoTimestamp / 1_000_000; // convert ns to ms
}
}