ONOS-5691 ONOS-5742 Fixing intent framework difficulties

- Refactoring AbstractAccumulator to use less blocking synchronization
- Fixing bug in AbstractAccumulator that could leave some items
  without firing
- Updated IntentStore for resubmitting pending operations

Change-Id: Iaf240da65e11ceb7d7d745cf4e25bfb8c26ed1eb
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentData.java b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
index feada17..c30993e 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentData.java
@@ -89,6 +89,29 @@
     }
 
     /**
+     * Creates a new intent data object.
+     *
+     * @param intent intent this metadata references
+     * @param state intent state
+     * @param request intent request
+     * @param version version of the intent for this key
+     * @param origin ID of the node where the data was originally created
+     */
+    public IntentData(Intent intent, IntentState state, IntentState request, Timestamp version, NodeId origin) {
+        checkNotNull(intent);
+        checkNotNull(state);
+        checkNotNull(request);
+        checkNotNull(version);
+        checkNotNull(origin);
+
+        this.intent = intent;
+        this.state = state;
+        this.request = request;
+        this.version = version;
+        this.origin = origin;
+    }
+
+    /**
      * Copy constructor.
      *
      * @param intentData intent data to copy
diff --git a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
index f6f69c5..6c0dff4 100644
--- a/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onosproject/net/intent/IntentStore.java
@@ -131,6 +131,15 @@
     Iterable<IntentData> getPendingData();
 
     /**
+     * Returns the intent data object that are pending processing for a specfied
+     * key.
+     *
+     * @param intentKey key to look up
+     * @return pending intent data object
+     */
+    IntentData getPendingData(Key intentKey);
+
+    /**
      * Returns the intent data objects that are pending processing for longer
      * than the specified duration.
      *
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java
index 41ce036..02d7eb2 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleIntentStore.java
@@ -195,6 +195,11 @@
     }
 
     @Override
+    public IntentData getPendingData(Key intentKey) {
+        return pending.get(intentKey);
+    }
+
+    @Override
     public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
         long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns
         final SystemClockTimestamp time = new SystemClockTimestamp(older);
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
index 56ee709..bb180c8 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentCleanup.java
@@ -210,6 +210,13 @@
     private void cleanup() {
         int corruptCount = 0, failedCount = 0, stuckCount = 0, pendingCount = 0;
 
+        // Check the pending map first, because the check of the current map
+        // will add items to the pending map.
+        for (IntentData intentData : store.getPendingData(true, periodMs)) {
+            resubmitPendingRequest(intentData);
+            pendingCount++;
+        }
+
         for (IntentData intentData : store.getIntentData(true, periodMs)) {
             switch (intentData.state()) {
                 case FAILED:
@@ -231,11 +238,6 @@
             }
         }
 
-        for (IntentData intentData : store.getPendingData(true, periodMs)) {
-            resubmitPendingRequest(intentData);
-            pendingCount++;
-        }
-
         if (corruptCount + failedCount + stuckCount + pendingCount > 0) {
             log.debug("Intent cleanup ran and resubmitted {} corrupt, {} failed, {} stuck, and {} pending intents",
                     corruptCount, failedCount, stuckCount, pendingCount);
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
index 8ea910b..631151b 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentInstaller.java
@@ -53,7 +53,7 @@
  */
 class IntentInstaller {
 
-    private static final Logger log = getLogger(IntentManager.class);
+    private static final Logger log = getLogger(IntentInstaller.class);
 
     private IntentStore store;
     private ObjectiveTrackerService trackerService;
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 12c636c..5524d63 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -49,6 +49,7 @@
 import org.onosproject.net.intent.impl.compiler.PointToPointIntentCompiler;
 import org.onosproject.net.intent.impl.phase.FinalIntentProcessPhase;
 import org.onosproject.net.intent.impl.phase.IntentProcessPhase;
+import org.onosproject.net.intent.impl.phase.Skipped;
 import org.osgi.service.component.ComponentContext;
 import org.onosproject.net.resource.ResourceService;
 import org.slf4j.Logger;
@@ -428,7 +429,8 @@
                                     //FIXME
                                     log.warn("Future failed: {}", e);
                                     return null;
-                                })).collect(Collectors.toList());
+                                }))
+                        .collect(Collectors.toList());
 
                 // write multiple data to store in order
                 store.batchWrite(Tools.allOf(futures).join().stream()
@@ -449,6 +451,16 @@
     }
 
     private IntentProcessPhase createInitialPhase(IntentData data) {
+        IntentData pending = store.getPendingData(data.key());
+        if (pending == null || pending.version().isNewerThan(data.version())) {
+            /*
+                If the pending map is null, then this intent was compiled by a
+                previous batch iteration, so we can skip it.
+                If the pending map has a newer request, it will get compiled as
+                part of the next batch, so we can skip it.
+             */
+            return Skipped.getPhase();
+        }
         IntentData current = store.getIntentData(data.key());
         return newInitialPhase(processor, data, current);
     }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/phase/Skipped.java b/core/net/src/main/java/org/onosproject/net/intent/impl/phase/Skipped.java
new file mode 100644
index 0000000..a8afde6
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/phase/Skipped.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.intent.impl.phase;
+
+import org.onosproject.net.intent.IntentData;
+
+/**
+ * Represents a phase where an intent is not compiled.
+ * <p>
+ * This should be used if a new version of the intent will immediately override
+ * this one.
+ * </p>
+ */
+public final class Skipped extends FinalIntentProcessPhase {
+
+    private static final Skipped SINGLETON = new Skipped();
+
+    /**
+     * Returns a shared skipped phase.
+     *
+     * @return skipped phase
+     */
+    public static Skipped getPhase() {
+        return SINGLETON;
+    }
+
+    // Prevent object construction; use getPhase()
+    private Skipped() {
+    }
+
+    @Override
+    public IntentData data() {
+        return null;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
index c7dfb77..284a973 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/GossipIntentStore.java
@@ -35,11 +35,11 @@
 import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentData;
 import org.onosproject.net.intent.IntentEvent;
+import org.onosproject.net.intent.WorkPartitionService;
 import org.onosproject.net.intent.IntentState;
 import org.onosproject.net.intent.IntentStore;
 import org.onosproject.net.intent.IntentStoreDelegate;
 import org.onosproject.net.intent.Key;
-import org.onosproject.net.intent.WorkPartitionService;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.Timestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
@@ -141,19 +141,26 @@
                 .withName("intent-current")
                 .withSerializer(intentSerializer)
                 .withTimestampProvider((key, intentData) ->
-                                               new MultiValuedTimestamp<>(
-                                                       intentData == null ? new WallClockTimestamp() :
-                                                               intentData.version(),
-                                                                          sequenceNumber.getAndIncrement()))
+                        new MultiValuedTimestamp<>(intentData == null ?
+                            new WallClockTimestamp() : intentData.version(),
+                                                   sequenceNumber.getAndIncrement()))
                 .withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData));
 
         EventuallyConsistentMapBuilder pendingECMapBuilder =
                 storageService.<Key, IntentData>eventuallyConsistentMapBuilder()
                 .withName("intent-pending")
                 .withSerializer(intentSerializer)
-                .withTimestampProvider((key, intentData) -> intentData == null ?
-                        new MultiValuedTimestamp<>(new WallClockTimestamp(), System.nanoTime()) :
-                        new MultiValuedTimestamp<>(intentData.version(), System.nanoTime()))
+                .withTimestampProvider((key, intentData) ->
+                        /*
+                            We always want to accept new values in the pending map,
+                            so we should use a high performance logical clock.
+                        */
+                        /*
+                            TODO We use the wall clock for the time being, but
+                            this could result in issues if there is clock skew
+                            across instances.
+                         */
+                        new MultiValuedTimestamp<>(new WallClockTimestamp(), System.nanoTime()))
                 .withPeerUpdateFunction((key, intentData) -> getPeerNodes(key, intentData));
         if (initiallyPersistent) {
             currentECMapBuilder = currentECMapBuilder.withPersistence();
@@ -285,17 +292,19 @@
             } else {
                 currentMap.put(newData.key(), new IntentData(newData));
             }
-
-            // Remove the intent data from the pending map if the newData is more
-            // recent or equal to the existing entry.
-            pendingMap.compute(newData.key(), (key, existingValue) -> {
-                if (existingValue == null || !existingValue.version().isNewerThan(newData.version())) {
-                    return null;
-                } else {
-                    return existingValue;
-                }
-            });
         }
+        /*
+         * Remove the intent data from the pending map if the newData is more
+         * recent or equal to the existing entry. No matter if it is an acceptable
+         * update or not.
+         */
+        pendingMap.compute(newData.key(), (key, existingValue) -> {
+            if (existingValue == null || !existingValue.version().isNewerThan(newData.version())) {
+                return null;
+            } else {
+                return existingValue;
+            }
+        });
     }
 
     private Collection<NodeId> getPeerNodes(Key key, IntentData data) {
@@ -363,7 +372,12 @@
         Timestamp version = data.version() != null ? data.version() : new WallClockTimestamp();
         pendingMap.compute(data.key(), (key, existingValue) -> {
             if (existingValue == null || existingValue.version().isOlderThan(version)) {
-                return new IntentData(data.intent(), data.state(),
+                /*
+                 * This avoids to create Intent with state == request, which can
+                 * be problematic if the Intent state is different from *REQ
+                 * {INSTALL_, WITHDRAW_ and PURGE_}.
+                 */
+                return new IntentData(data.intent(), data.state(), data.request(),
                                       version, clusterService.getLocalNode().id());
             } else {
                 return existingValue;
@@ -389,6 +403,11 @@
     }
 
     @Override
+    public IntentData getPendingData(Key intentKey) {
+        return pendingMap.get(intentKey);
+    }
+
+    @Override
     public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
         long now = System.currentTimeMillis();
         final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
@@ -403,7 +422,6 @@
         @Override
         public void event(EventuallyConsistentMapEvent<Key, IntentData> event) {
             IntentData intentData = event.value();
-
             if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
                 // The current intents map has been updated. If we are master for
                 // this intent's partition, notify the Manager that it should
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index fb3e4f0..273d409 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -15,6 +15,7 @@
  */
 package org.onlab.util;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -22,6 +23,7 @@
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -40,10 +42,10 @@
     private final int maxBatchMillis;
     private final int maxIdleMillis;
 
-    private volatile TimerTask idleTask = new ProcessorTask();
-    private volatile TimerTask maxTask = new ProcessorTask();
+    private final AtomicReference<TimerTask> idleTask = new AtomicReference<>();
+    private final AtomicReference<TimerTask> maxTask = new AtomicReference<>();
 
-    private List<T> items = Lists.newArrayList();
+    private final List<T> items;
 
     /**
      * Creates an item accumulator capable of triggering on the specified
@@ -52,6 +54,11 @@
      * @param timer          timer to use for scheduling check-points
      * @param maxItems       maximum number of items to accumulate before
      *                       processing is triggered
+     *                       <p>
+     *                       NB: It is possible that processItems will contain
+     *                       more than maxItems under high load or if isReady()
+     *                       can return false.
+     *                       </p>
      * @param maxBatchMillis maximum number of millis allowed since the first
      *                       item before processing is triggered
      * @param maxIdleMillis  maximum number millis between items before
@@ -68,103 +75,118 @@
         this.maxItems = maxItems;
         this.maxBatchMillis = maxBatchMillis;
         this.maxIdleMillis = maxIdleMillis;
+
+        items = Lists.newArrayListWithExpectedSize(maxItems);
     }
 
     @Override
-    public synchronized void add(T item) {
-        idleTask = cancelIfActive(idleTask);
-        items.add(checkNotNull(item, "Item cannot be null"));
+    public void add(T item) {
+        final int sizeAtTimeOfAdd;
+        synchronized (items) {
+            items.add(item);
+            sizeAtTimeOfAdd = items.size();
+        }
+
+        /*
+            WARNING: It is possible that the item that was just added to the list
+            has been processed by an existing idle task at this point.
+
+            By rescheduling the following timers, it is possible that a
+            superfluous maxTask is generated now OR that the idle task and max
+            task are scheduled at their specified delays. This could result in
+            calls to processItems sooner than expected.
+         */
 
         // Did we hit the max item threshold?
-        if (items.size() >= maxItems) {
-            maxTask = cancelIfActive(maxTask);
-            scheduleNow();
+        if (sizeAtTimeOfAdd >= maxItems) {
+            if (maxIdleMillis < maxBatchMillis) {
+                cancelTask(idleTask);
+            }
+            rescheduleTask(maxTask, 0 /* now! */);
         } else {
             // Otherwise, schedule idle task and if this is a first item
             // also schedule the max batch age task.
-            idleTask = schedule(maxIdleMillis);
-            if (items.size() == 1) {
-                maxTask = schedule(maxBatchMillis);
+            if (maxIdleMillis < maxBatchMillis) {
+                rescheduleTask(idleTask, maxIdleMillis);
+            }
+            if (sizeAtTimeOfAdd == 1) {
+                rescheduleTask(maxTask, maxBatchMillis);
             }
         }
     }
 
     /**
-     * Finalizes the current batch, if ready, and schedules a new processor
-     * in the immediate future.
+     * Reschedules the specified task, cancelling existing one if applicable.
+     *
+     * @param taskRef task reference
+     * @param millis delay in milliseconds
      */
-    private void scheduleNow() {
-        if (isReady()) {
-            TimerTask task = new ProcessorTask(finalizeCurrentBatch());
-            timer.schedule(task, 1);
-        }
+    private void rescheduleTask(AtomicReference<TimerTask> taskRef, long millis) {
+        ProcessorTask newTask = new ProcessorTask();
+        timer.schedule(newTask, millis);
+        swapAndCancelTask(taskRef, newTask);
     }
 
     /**
-     * Schedules a new processor task given number of millis in the future.
-     * Batch finalization is deferred to time of execution.
+     * Cancels the specified task if it has not run or is not running.
+     *
+     * @param taskRef task reference
      */
-    private TimerTask schedule(int millis) {
-        TimerTask task = new ProcessorTask();
-        timer.schedule(task, millis);
-        return task;
+    private void cancelTask(AtomicReference<TimerTask> taskRef) {
+        swapAndCancelTask(taskRef, null);
     }
 
     /**
-     * Cancels the specified task if it is active.
+     * Sets the new task and attempts to cancelTask the old one.
+     *
+     * @param taskRef task reference
+     * @param newTask new task
      */
-    private TimerTask cancelIfActive(TimerTask task) {
-        if (task != null) {
-            task.cancel();
+    private void swapAndCancelTask(AtomicReference<TimerTask> taskRef,
+                                        TimerTask newTask) {
+        TimerTask oldTask = taskRef.getAndSet(newTask);
+        if (oldTask != null) {
+            oldTask.cancel();
         }
-        return task;
     }
 
     // Task for triggering processing of accumulated items
     private class ProcessorTask extends TimerTask {
-
-        private final List<T> items;
-
-        // Creates a new processor task with deferred batch finalization.
-        ProcessorTask() {
-            this.items = null;
-        }
-
-        // Creates a new processor task with pre-emptive batch finalization.
-        ProcessorTask(List<T> items) {
-            this.items = items;
-        }
-
         @Override
         public void run() {
-            synchronized (AbstractAccumulator.this) {
-                idleTask = cancelIfActive(idleTask);
-            }
-            if (isReady()) {
-                try {
-                    synchronized (AbstractAccumulator.this) {
-                        maxTask = cancelIfActive(maxTask);
-                    }
-                    List<T> batch = items != null ? items : finalizeCurrentBatch();
+            try {
+                if (isReady()) {
+
+                    List<T> batch = finalizeCurrentBatch();
                     if (!batch.isEmpty()) {
                         processItems(batch);
                     }
-                } catch (Exception e) {
-                    log.warn("Unable to process batch due to", e);
+                } else {
+                    rescheduleTask(idleTask, maxIdleMillis);
                 }
-            } else {
-                synchronized (AbstractAccumulator.this) {
-                    idleTask = schedule(maxIdleMillis);
-                }
+            } catch (Exception e) {
+                log.warn("Unable to process batch due to", e);
             }
         }
     }
 
-    // Demotes and returns the current batch of items and promotes a new one.
-    private synchronized List<T> finalizeCurrentBatch() {
-        List<T> toBeProcessed = items;
-        items = Lists.newArrayList();
-        return toBeProcessed;
+    /**
+     * Returns an immutable copy of the existing items and clear the list.
+     *
+     * @return list of existing items
+     */
+    private List<T> finalizeCurrentBatch() {
+        List<T> finalizedList;
+        synchronized (items) {
+            finalizedList = ImmutableList.copyOf(items);
+            items.clear();
+            /*
+             * To avoid reprocessing being triggered on an empty list.
+             */
+            cancelTask(maxTask);
+            cancelTask(idleTask);
+        }
+        return finalizedList;
     }
 
     @Override
diff --git a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
index adb0e95..d935cd8 100644
--- a/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
@@ -20,9 +20,7 @@
 import java.util.List;
 import java.util.stream.IntStream;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 import static org.onlab.junit.TestTools.assertAfter;
 
 /**
@@ -142,7 +140,8 @@
         IntStream.range(0, 1000).forEach(i -> accumulator.add(new TestItem("#" + i)));
         timer.advanceTimeMillis(1);
         assertAfter(100, () -> assertEquals("wrong item count", 1000, accumulator.itemCount));
-        assertEquals("wrong batch count", 200, accumulator.batchCount);
+        //TODO this assertion could fail under heavy load
+        assertTrue("batch count not near 200", Math.abs(200 - accumulator.batchCount) < 10);
     }
 
     private class TestItem {