Updating IntentCleanup to check for stalled *_REQ and *ING intents.

Change-Id: Ibe06ee99463bb8230acf9751da4fb1012859b0ea
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
index 7950949..0c3e649 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
@@ -43,10 +43,11 @@
      * Returns an iterable of all intent data objects in the store.
      *
      * @param localOnly should only intents for which this instance is master
-     *                  should be returned
+     *                  be returned
+     * @param olderThan specified duration in milliseconds (0 for "now")
      * @return iterable of all intent data objects
      */
-    Iterable<IntentData> getIntentData(boolean localOnly);
+    Iterable<IntentData> getIntentData(boolean localOnly, long olderThan);
 
     /**
      * Returns the state of the specified intent.
@@ -119,4 +120,22 @@
      * @return pending intents
      */
     Iterable<Intent> getPending();
+
+    /**
+     * Returns the intent data objects that are pending processing.
+     *
+     * @return pending intent data objects
+     */
+    Iterable<IntentData> getPendingData();
+
+    /**
+     * Returns the intent data objects that are pending processing for longer
+     * than the specified duration.
+     *
+     * @param localOnly  should only intents for which this instance is master
+     *                   be returned
+     * @param olderThan specified duration in milliseconds (0 for "now")
+     * @return pending intent data objects
+     */
+    Iterable<IntentData> getPendingData(boolean localOnly, long olderThan);
 }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
index cafa998..de57781 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
@@ -28,6 +28,7 @@
 import org.onosproject.net.intent.IntentListener;
 import org.onosproject.net.intent.IntentService;
 import org.onosproject.net.intent.IntentStore;
+import org.onosproject.net.intent.Key;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
@@ -41,12 +42,16 @@
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.intent.IntentState.CORRUPT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * FIXME Class to cleanup Intents in CORRUPT state.
- * FIXME move this to its own file eventually (but need executor for now)
+ * This component cleans up intents that have encountered errors or otherwise
+ * stalled during installation or withdrawal.
+ * <p>
+ * It periodically polls (based on configured period) for pending and CORRUPT
+ * intents from the store and retries. It also listens for CORRUPT event
+ * notifications, which signify errors in processing, and retries.
+ * </p>
  */
 @Component(immediate = true)
 public class IntentCleanup implements Runnable, IntentListener {
@@ -58,6 +63,7 @@
     @Property(name = "period", intValue = DEFAULT_PERIOD,
               label = "Frequency in ms between cleanup runs")
     protected int period = DEFAULT_PERIOD;
+    private long periodMs;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected IntentService service;
@@ -126,7 +132,7 @@
             }
         };
 
-        long periodMs = period * 1000; //convert to ms
+        periodMs = period * 1_000; //convert to ms
         timer.scheduleAtFixedRate(timerTask, periodMs, periodMs);
     }
 
@@ -140,41 +146,79 @@
         }
     }
 
+    private void resubmitCorrupt(IntentData intentData, boolean checkThreshold) {
+        //TODO we might want to give up when retry count exceeds a threshold
+        // FIXME drop this if we exceed retry threshold
+
+
+        switch (intentData.request()) {
+            case INSTALL_REQ:
+                service.submit(intentData.intent());
+                break;
+            case WITHDRAW_REQ:
+                service.withdraw(intentData.intent());
+                break;
+            default:
+                //TODO this is an error, might want to log it
+                break;
+        }
+    }
+
+    private void resubmitPendingRequest(IntentData intentData) {
+        switch (intentData.request()) {
+            case INSTALL_REQ:
+                service.submit(intentData.intent());
+                break;
+            case WITHDRAW_REQ:
+                service.withdraw(intentData.intent());
+                break;
+            default:
+                //TODO this is an error (or could be purge), might want to log it
+                break;
+        }
+    }
+
     /**
-     * Iterate through CORRUPT intents and re-submit/withdraw.
+     * Iterate through CORRUPT intents and re-submit/withdraw appropriately.
      *
-     * FIXME we want to eventually count number of retries per intent and give up
-     * FIXME we probably also want to look at intents that have been stuck
-     *       in *_REQ or *ING for "too long".
      */
     private void cleanup() {
-        int count = 0;
-        for (IntentData intentData : store.getIntentData(true)) {
-            if (intentData.state() == CORRUPT) {
-                switch (intentData.request()) {
-                    case INSTALL_REQ:
-                        service.submit(intentData.intent());
-                        count++;
-                        break;
-                    case WITHDRAW_REQ:
-                        service.withdraw(intentData.intent());
-                        count++;
-                        break;
-                    default:
-                        //TODO this is an error
-                        break;
-                }
+        int corruptCount = 0, stuckCount = 0, pendingCount = 0;
+        for (IntentData intentData : store.getIntentData(true, periodMs)) {
+            switch (intentData.state()) {
+                case CORRUPT:
+                    resubmitCorrupt(intentData, false);
+                    corruptCount++;
+                case INSTALLING: //FALLTHROUGH
+                case WITHDRAWING:
+                    resubmitPendingRequest(intentData);
+                    stuckCount++;
+                default:
+                    //NOOP
+                    break;
             }
         }
-        log.debug("Intent cleanup ran and resubmitted {} intents", count);
+
+        for (IntentData intentData : store.getPendingData(true, periodMs)) {
+            //TODO should we do age check here, or in the store?
+            resubmitPendingRequest(intentData);
+            stuckCount++;
+        }
+
+        log.debug("Intent cleanup ran and resubmitted {} corrupt, {} stuck, and {} pending intents",
+                  corruptCount, stuckCount, pendingCount);
     }
 
     @Override
     public void event(IntentEvent event) {
+        // fast path for CORRUPT intents, retry on event notification
+        //TODO we might consider using the timer to back off for subsequent retries
         if (event.type() == IntentEvent.Type.CORRUPT) {
-            // FIXME drop this if we exceed retry threshold
-            // just run the whole cleanup script for now
-            executor.submit(this);
+            Key key = event.subject().key();
+            if (store.isMaster(key)) {
+                IntentData data = store.getIntentData(event.subject().key());
+                resubmitCorrupt(data, true);
+            }
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/impl/WallClockTimestamp.java b/core/store/dist/src/main/java/org/onosproject/store/impl/WallClockTimestamp.java
index a158928..e7efb29 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/impl/WallClockTimestamp.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/impl/WallClockTimestamp.java
@@ -36,6 +36,10 @@
         unixTimestamp = System.currentTimeMillis();
     }
 
+    public WallClockTimestamp(long timestamp) {
+        unixTimestamp = timestamp;
+    }
+
     @Override
     public int compareTo(Timestamp o) {
         checkArgument(o instanceof WallClockTimestamp,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index 4f0ad66..7de2d17 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -132,10 +132,13 @@
     }
 
     @Override
-    public Iterable<IntentData> getIntentData(boolean localOnly) {
-        if (localOnly) {
+    public Iterable<IntentData> getIntentData(boolean localOnly, long olderThan) {
+        if (localOnly || olderThan > 0) {
+            long now = System.currentTimeMillis();
+            final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
             return currentMap.values().stream()
-                    .filter(data -> isMaster(data.key()))
+                    .filter(data -> data.version().isOlderThan(time) &&
+                            (!localOnly || isMaster(data.key())))
                     .collect(Collectors.toList());
         }
         return currentMap.values();
@@ -261,6 +264,21 @@
                 .collect(Collectors.toList());
     }
 
+    @Override
+    public Iterable<IntentData> getPendingData() {
+        return pendingMap.values();
+    }
+
+    @Override
+    public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
+        long now = System.currentTimeMillis();
+        final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
+        return pendingMap.values().stream()
+                .filter(data -> data.version().isOlderThan(time) &&
+                                (!localOnly || isMaster(data.key())))
+                .collect(Collectors.toList());
+    }
+
     private void notifyDelegateIfNotNull(IntentEvent event) {
         if (event != null) {
             notifyDelegate(event);
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java
index 186fdd9..72a5dce 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleIntentStore.java
@@ -36,7 +36,7 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.net.intent.IntentState.*;
+import static org.onosproject.net.intent.IntentState.PURGE_REQ;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -76,11 +76,15 @@
     }
 
     @Override
-    public Iterable<IntentData> getIntentData(boolean localOnly) {
-        if (localOnly) {
-            return current.values().stream()
-                    .filter(data -> isMaster(data.key()))
+    public Iterable<IntentData> getIntentData(boolean localOnly, long olderThan) {
+        if (localOnly || olderThan > 0) {
+            long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns
+            final SystemClockTimestamp time = new SystemClockTimestamp(older);
+            return pending.values().stream()
+                    .filter(data -> data.version().isOlderThan(time) &&
+                            (!localOnly || isMaster(data.key())))
                     .collect(Collectors.toList());
+
         }
         return Lists.newArrayList(current.values());
     }
@@ -191,4 +195,19 @@
                 .map(IntentData::intent)
                 .collect(Collectors.toList());
     }
+
+    @Override
+    public Iterable<IntentData> getPendingData() {
+        return Lists.newArrayList(pending.values());
+    }
+
+    @Override
+    public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
+        long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns
+        final SystemClockTimestamp time = new SystemClockTimestamp(older);
+        return pending.values().stream()
+                .filter(data -> data.version().isOlderThan(time) &&
+                        (!localOnly || isMaster(data.key())))
+                .collect(Collectors.toList());
+    }
 }
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SystemClockTimestamp.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SystemClockTimestamp.java
index d79797e..1e98a7a 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SystemClockTimestamp.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SystemClockTimestamp.java
@@ -29,10 +29,14 @@
  */
 public class SystemClockTimestamp implements Timestamp {
 
-    private final long unixTimestamp;
+    private final long nanoTimestamp;
 
     public SystemClockTimestamp() {
-        unixTimestamp = System.nanoTime();
+        nanoTimestamp = System.nanoTime();
+    }
+
+    public SystemClockTimestamp(long timestamp) {
+        nanoTimestamp = timestamp;
     }
 
     @Override
@@ -42,12 +46,12 @@
         SystemClockTimestamp that = (SystemClockTimestamp) o;
 
         return ComparisonChain.start()
-                .compare(this.unixTimestamp, that.unixTimestamp)
+                .compare(this.nanoTimestamp, that.nanoTimestamp)
                 .result();
     }
     @Override
     public int hashCode() {
-        return Objects.hash(unixTimestamp);
+        return Objects.hash(nanoTimestamp);
     }
 
     @Override
@@ -59,17 +63,21 @@
             return false;
         }
         SystemClockTimestamp that = (SystemClockTimestamp) obj;
-        return Objects.equals(this.unixTimestamp, that.unixTimestamp);
+        return Objects.equals(this.nanoTimestamp, that.nanoTimestamp);
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
-                    .add("unixTimestamp", unixTimestamp)
+                    .add("nanoTimestamp", nanoTimestamp)
                     .toString();
     }
 
+    public long nanoTimestamp() {
+        return nanoTimestamp;
+    }
+
     public long systemTimestamp() {
-        return unixTimestamp;
+        return nanoTimestamp / 1_000_000; // convert ns to ms
     }
 }