Implements in-order flow objective execution

Change-Id: I6d11e25bc44c07ef8488e51b63c3ff8a88d98f9c
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index fdba591..20e55a5 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -80,12 +80,12 @@
 /**
  * Provides implementation of the flow objective programming service.
  */
-@Component(immediate = true)
+@Component(enabled = false)
 @Service
 public class FlowObjectiveManager implements FlowObjectiveService {
 
-    public static final int INSTALL_RETRY_ATTEMPTS = 5;
-    public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
+    private static final int INSTALL_RETRY_ATTEMPTS = 5;
+    private static final long INSTALL_RETRY_INTERVAL = 1000; // ms
 
     private static final String WORKER_PATTERN = "objective-installer-%d";
     private static final String GROUP_THREAD_NAME = "onos/objective-installer";
@@ -111,9 +111,11 @@
     // Note: The following dependencies are added on behalf of the pipeline
     // driver behaviours to assure these services are available for their
     // initialization.
+    @SuppressWarnings("unused")
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleService flowRuleService;
 
+    @SuppressWarnings("unused")
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected GroupService groupService;
 
@@ -123,31 +125,30 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
-    private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
+    final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
 
     private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
-    private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
+    protected final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
 
     private final PipelinerContext context = new InnerPipelineContext();
     private final DeviceListener deviceListener = new InnerDeviceListener();
     private final DriverListener driverListener = new InnerDriverListener();
 
-    protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
+    private ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
 
     // local stores for queuing fwd and next objectives that are waiting for an
     // associated next objective execution to complete. The signal for completed
     // execution comes from a pipeline driver, in this or another controller
     // instance, via the DistributedFlowObjectiveStore.
-    private final Map<Integer, Set<PendingFlowObjective>> pendingForwards =
-            Maps.newConcurrentMap();
-    private final Map<Integer, List<PendingFlowObjective>> pendingNexts =
-            Maps.newConcurrentMap();
+    // TODO Making these cache and timeout the entries
+    final Map<Integer, Set<PendingFlowObjective>> pendingForwards = Maps.newConcurrentMap();
+    final Map<Integer, List<PendingFlowObjective>> pendingNexts = Maps.newConcurrentMap();
 
     // local store to track which nextObjectives were sent to which device
     // for debugging purposes
     private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
 
-    private ExecutorService executorService;
+    ExecutorService executorService;
 
     @Activate
     protected void activate() {
@@ -196,17 +197,17 @@
      * make a few attempts to find the appropriate driver, then eventually give
      * up and report an error if no suitable driver could be found.
      */
-    private class ObjectiveInstaller implements Runnable {
-        private final DeviceId deviceId;
-        private final Objective objective;
+    class ObjectiveInstaller implements Runnable {
+        final DeviceId deviceId;
+        final Objective objective;
 
         private final int numAttempts;
 
-        public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
+        ObjectiveInstaller(DeviceId deviceId, Objective objective) {
             this(deviceId, objective, 1);
         }
 
-        public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
+        ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
             this.deviceId = checkNotNull(deviceId);
             this.objective = checkNotNull(objective);
             this.numAttempts = attemps;
@@ -281,7 +282,7 @@
     public void initPolicy(String policy) {
     }
 
-    private boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
+    boolean queueFwdObjective(DeviceId deviceId, ForwardingObjective fwd) {
         boolean queued = false;
         synchronized (pendingForwards) {
             // double check the flow objective store, because this block could run
@@ -306,8 +307,7 @@
         return queued;
     }
 
-    private boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
-
+    boolean queueNextObjective(DeviceId deviceId, NextObjective next) {
         // we need to hold off on other operations till we get notified that the
         // initial group creation has succeeded
         boolean queued = false;
@@ -578,11 +578,11 @@
      * removeFromExisting) waiting for a next objective with the same id to
      * complete execution.
      */
-    private class PendingFlowObjective {
+    protected class PendingFlowObjective {
         private final DeviceId deviceId;
         private final Objective flowObj;
 
-        public PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
+        PendingFlowObjective(DeviceId deviceId, Objective flowObj) {
             this.deviceId = deviceId;
             this.flowObj = flowObj;
         }
@@ -609,11 +609,9 @@
                 return false;
             }
             final PendingFlowObjective other = (PendingFlowObjective) obj;
-            if (this.deviceId.equals(other.deviceId) &&
-                    this.flowObj.equals(other.flowObj)) {
-                return true;
-            }
-            return false;
+
+            return this.deviceId.equals(other.deviceId) &&
+                    this.flowObj.equals(other.flowObj);
         }
     }
 
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
new file mode 100644
index 0000000..f6e6dd8
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -0,0 +1,376 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.flowobjective.impl;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+@Component(immediate = true, enabled = true)
+@Service
+public class InOrderFlowObjectiveManager extends FlowObjectiveManager {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    // TODO Making these cache and timeout the entries
+    private ListMultimap<FiltObjQueueKey, Objective> filtObjQueue =
+            Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
+    private ListMultimap<FwdObjQueueKey, Objective> fwdObjQueue =
+            Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
+    private ListMultimap<NextObjQueueKey, Objective> nextObjQueue =
+            Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
+
+    final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
+
+    @Activate
+    protected void activate() {
+        super.activate();
+        // Replace store delegate to make sure pendingForward and pendingNext are resubmitted to
+        // process()
+        flowObjectiveStore.unsetDelegate(super.delegate);
+        flowObjectiveStore.setDelegate(delegate);
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        super.deactivate();
+    }
+
+    /**
+     * Processes given objective on given device.
+     * Objectives submitted through this method are guaranteed to be executed in order.
+     *
+     * @param deviceId Device ID
+     * @param originalObjective Flow objective to be executed
+     */
+    private void process(DeviceId deviceId, Objective originalObjective) {
+        // Inject ObjectiveContext such that we can get notified when it is completed
+        Objective.Builder objBuilder = originalObjective.copy();
+        Optional<ObjectiveContext> originalContext = originalObjective.context();
+        ObjectiveContext context = new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                log.trace("Flow objective onSuccess {}", objective);
+                dequeue(deviceId, objective);
+                originalContext.ifPresent(c -> c.onSuccess(objective));
+            }
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                log.trace("Flow objective onError {}", objective);
+                dequeue(deviceId, objective);
+                originalContext.ifPresent(c -> c.onError(objective, error));
+            }
+        };
+
+        // Preserve Objective.Operation
+        Objective objective;
+        switch (originalObjective.op()) {
+            case ADD:
+                objective = objBuilder.add(context);
+                break;
+            case ADD_TO_EXISTING:
+                objective = ((NextObjective.Builder) objBuilder).addToExisting(context);
+                break;
+            case REMOVE:
+                objective = objBuilder.remove(context);
+                break;
+            case REMOVE_FROM_EXISTING:
+                objective = ((NextObjective.Builder) objBuilder).removeFromExisting(context);
+                break;
+            case MODIFY:
+                objective = ((NextObjective.Builder) objBuilder).modify(context);
+                break;
+            case VERIFY:
+                objective = ((NextObjective.Builder) objBuilder).verify(context);
+                break;
+            default:
+                log.error("Unknown flow objecitve operation {}", originalObjective.op());
+                return;
+        }
+
+        enqueue(deviceId, objective);
+    }
+
+    @Override
+    public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
+        process(deviceId, filteringObjective);
+    }
+
+    @Override
+    public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
+        process(deviceId, forwardingObjective);
+    }
+
+    @Override
+    public void next(DeviceId deviceId, NextObjective nextObjective) {
+        process(deviceId, nextObjective);
+    }
+
+    /**
+     * Enqueue flow objective. Execute the flow objective if there is no pending objective ahead.
+     *
+     * @param deviceId Device ID
+     * @param obj Flow objective
+     */
+    private synchronized void enqueue(DeviceId deviceId, Objective obj) {
+        int queueSize;
+        int priority = obj.priority();
+
+        if (obj instanceof FilteringObjective) {
+            log.debug("Enqueue filtering objective {}", obj);
+            FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+            filtObjQueue.put(k, obj);
+            queueSize = filtObjQueue.get(k).size();
+            log.debug("Filtering objective queue size {}", queueSize);
+        } else if (obj instanceof ForwardingObjective) {
+            log.debug("Enqueue forwarding objective {}", obj);
+            FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+            fwdObjQueue.put(k, obj);
+            queueSize = fwdObjQueue.get(k).size();
+            log.debug("Forwarding objective queue size {}", queueSize);
+        } else if (obj instanceof NextObjective) {
+            log.debug("Enqueue next objective {}", obj);
+            NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+            nextObjQueue.put(k, obj);
+            queueSize = nextObjQueue.get(k).size();
+            log.debug("Next objective queue size {}", queueSize);
+        } else {
+            log.error("Unknown flow objective instance: {}", obj.getClass().getName());
+            return;
+        }
+
+        // Execute immediately if there is no pending obj ahead
+        if (queueSize == 1) {
+            log.debug("First one. Submit objective installer, deviceId {}, obj {}", deviceId, obj);
+            execute(deviceId, obj);
+        }
+    }
+
+    /**
+     * Dequeue flow objective. Execute the next flow objective in the queue, if any.
+     *
+     * @param deviceId Device ID
+     * @param obj Flow objective
+     */
+    private synchronized void dequeue(DeviceId deviceId, Objective obj) {
+        List<Objective> remaining;
+        int priority = obj.priority();
+
+        if (obj instanceof FilteringObjective) {
+            log.debug("Dequeue filtering objective {}", obj);
+            FiltObjQueueKey k = new FiltObjQueueKey(deviceId, priority, ((FilteringObjective) obj).key());
+            filtObjQueue.remove(k, obj);
+            remaining = filtObjQueue.get(k);
+            log.debug("Filtering objective queue size {}", remaining.size());
+        } else if (obj instanceof ForwardingObjective) {
+            log.debug("Dequeue forwarding objective {}", obj);
+            FwdObjQueueKey k = new FwdObjQueueKey(deviceId, priority, ((ForwardingObjective) obj).selector());
+            fwdObjQueue.remove(k, obj);
+            remaining = fwdObjQueue.get(k);
+            log.debug("Forwarding objective queue size {}", remaining.size());
+        } else if (obj instanceof NextObjective) {
+            log.debug("Dequeue next objective {}", obj);
+            NextObjQueueKey k = new NextObjQueueKey(deviceId, obj.id());
+            nextObjQueue.remove(k, obj);
+            remaining = nextObjQueue.get(k);
+            log.debug("Next objective queue size {}", remaining.size());
+        } else {
+            log.error("Unknown flow objective instance: {}", obj.getClass().getName());
+            return;
+        }
+
+        // Submit the next one in the queue, if any
+        if (remaining.size() > 0) {
+            log.debug("Next one. Submit objective installer, deviceId {}, obj {}", deviceId, obj);
+            execute(deviceId, remaining.get(0));
+        }
+    }
+
+    /**
+     * Submit the flow objective. Starting from this point on, the execution order is not guaranteed.
+     * Therefore we must be certain that this method is called in-order.
+     *
+     * @param deviceId Device ID
+     * @param obj Flow objective
+     */
+    private void execute(DeviceId deviceId, Objective obj) {
+        if (obj instanceof FilteringObjective) {
+            super.filter(deviceId, (FilteringObjective) obj);
+        } else if (obj instanceof ForwardingObjective) {
+            super.forward(deviceId, (ForwardingObjective) obj);
+        } else if (obj instanceof NextObjective) {
+            super.next(deviceId, (NextObjective) obj);
+        } else {
+            log.error("Unknown flow objective instance: {}", obj.getClass().getName());
+        }
+    }
+
+    private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
+        @Override
+        public void notify(ObjectiveEvent event) {
+            if (event.type() == ObjectiveEvent.Type.ADD) {
+                log.debug("Received notification of obj event {}", event);
+                Set<PendingFlowObjective> pending;
+
+                // first send all pending flows
+                synchronized (pendingForwards) {
+                    // needs to be synchronized for queueObjective lookup
+                    pending = pendingForwards.remove(event.subject());
+                }
+                if (pending == null) {
+                    log.debug("No forwarding objectives pending for this "
+                            + "obj event {}", event);
+                } else {
+                    log.debug("Processing {} pending forwarding objectives for nextId {}",
+                            pending.size(), event.subject());
+                    // resubmitted back to the execution queue
+                    pending.forEach(p -> execute(p.deviceId(), p.flowObjective()));
+                }
+
+                // now check for pending next-objectives
+                // Note: This is still necessary despite the existence of in-order execution.
+                //       Since the in-order execution does not handle the case of
+                //       ADD_TO_EXISTING coming before ADD
+                List<PendingFlowObjective> pendNexts;
+                synchronized (pendingNexts) {
+                    // needs to be synchronized for queueObjective lookup
+                    pendNexts = pendingNexts.remove(event.subject());
+                }
+                if (pendNexts == null) {
+                    log.debug("No next objectives pending for this "
+                            + "obj event {}", event);
+                } else {
+                    log.debug("Processing {} pending next objectives for nextId {}",
+                            pendNexts.size(), event.subject());
+                    // resubmitted back to the execution queue
+                    pendNexts.forEach(p -> execute(p.deviceId(), p.flowObjective()));
+                }
+            }
+        }
+    }
+
+    private static class FiltObjQueueKey {
+        private DeviceId deviceId;
+        private int priority;
+        private Criterion key;
+
+        FiltObjQueueKey(DeviceId deviceId, int priority, Criterion key) {
+            this.deviceId = deviceId;
+            this.priority = priority;
+            this.key = key;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(deviceId, priority, key);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (!(other instanceof FiltObjQueueKey)) {
+                return false;
+            }
+            FiltObjQueueKey that = (FiltObjQueueKey) other;
+            return Objects.equals(this.deviceId, that.deviceId) &&
+                    Objects.equals(this.priority, that.priority) &&
+                    Objects.equals(this.key, that.key);
+        }
+    }
+
+    private static class FwdObjQueueKey {
+        private DeviceId deviceId;
+        private int priority;
+        private TrafficSelector selector;
+
+        FwdObjQueueKey(DeviceId deviceId, int priority, TrafficSelector selector) {
+            this.deviceId = deviceId;
+            this.priority = priority;
+            this.selector = selector;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(deviceId, priority, selector);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (!(other instanceof FwdObjQueueKey)) {
+                return false;
+            }
+            FwdObjQueueKey that = (FwdObjQueueKey) other;
+            return Objects.equals(this.deviceId, that.deviceId) &&
+                    Objects.equals(this.priority, that.priority) &&
+                    Objects.equals(this.selector, that.selector);
+        }
+    }
+
+    private static class NextObjQueueKey {
+        private DeviceId deviceId;
+        private int id;
+
+        NextObjQueueKey(DeviceId deviceId, int id) {
+            this.deviceId = deviceId;
+            this.id = id;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(deviceId, id);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+            if (!(other instanceof NextObjQueueKey)) {
+                return false;
+            }
+            NextObjQueueKey that = (NextObjQueueKey) other;
+            return Objects.equals(this.deviceId, that.deviceId) &&
+                    Objects.equals(this.id, that.id);
+        }
+    }
+}
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
new file mode 100644
index 0000000..bb89ced
--- /dev/null
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.flowobjective.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.MplsLabel;
+import org.onlab.packet.VlanId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.behaviour.PipelinerAdapter;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.onlab.junit.TestTools.assertAfter;
+import static org.onlab.util.Tools.groupedThreads;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+public class InOrderFlowObjectiveManagerTest {
+    private InOrderFlowObjectiveManager mgr;
+
+    private static final int PRIORITY = 1000;
+    private static final ApplicationId APP_ID = new DefaultApplicationId(1, "org.onosproject.test");
+    private static final DeviceId DEV1 = DeviceId.deviceId("of:1");
+    private static final PortNumber P1 = PortNumber.portNumber(1);
+    private static final PortNumber P2 = PortNumber.portNumber(2);
+    private static final PortNumber P3 = PortNumber.portNumber(3);
+    private static final PortNumber P4 = PortNumber.portNumber(4);
+    private static final MacAddress M1 = MacAddress.valueOf("00:00:00:00:00:01");
+    private static final MacAddress M2 = MacAddress.valueOf("00:00:00:00:00:02");
+    private static final MacAddress M3 = MacAddress.valueOf("00:00:00:00:00:03");
+    private static final VlanId V1 = VlanId.vlanId((short) 10);
+    private static final VlanId V2 = VlanId.vlanId((short) 20);
+    private static final VlanId V3 = VlanId.vlanId((short) 30);
+    private static final TrafficSelector S1 = DefaultTrafficSelector.builder()
+            .matchEthType(Ethernet.TYPE_IPV4).matchIPDst(IpPrefix.valueOf("10.0.0.1/32")).build();
+    private static final TrafficSelector S2 = DefaultTrafficSelector.builder()
+            .matchEthType(Ethernet.TYPE_IPV4).matchIPDst(IpPrefix.valueOf("10.0.0.2/32")).build();
+    private static final int NID1 = 1;
+    private static final int NID2 = 2;
+    private static final NextGroup NGRP1 = () -> new byte[] {0x00, 0x01};
+    private static final NextGroup NGRP2 = () -> new byte[] {0x02, 0x03};
+
+    // Delay flow objectives OFFSET + rand(0, BOUND) millis
+    private static final int OFFSET = 10; // ms
+    private static final int BOUND = 40; // ms
+
+    private static final FilteringObjective FILT1 = buildFilteringObjective(P2, V3, M3, 1).add();
+    private static final FilteringObjective FILT2 = buildFilteringObjective(P2, V2, M2, 2).add();
+    private static final FilteringObjective FILT3 = buildFilteringObjective(P2, V3, M3, 3).remove();
+    private static final FilteringObjective FILT4 = buildFilteringObjective(P1, V1, M1, 4).add();
+    private static final FilteringObjective FILT5 = buildFilteringObjective(P2, V2, M2, 5).remove();
+    private static final FilteringObjective FILT6 = buildFilteringObjective(P1, V1, M1, 6).remove();
+    private static final FilteringObjective FILT7 = buildFilteringObjective(P2, V3, M3, 7).add();
+    private List<FilteringObjective> expectFiltObjs = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(FILT1, FILT2, FILT3, FILT4, FILT5, FILT6, FILT7));
+
+    private static final NextObjective NEXT1 = buildNextObjective(NID1, V1, Sets.newHashSet(P1)).add();
+    private static final NextObjective NEXT2 = buildNextObjective(NID2, V2, Sets.newHashSet(P3)).add();
+    private static final NextObjective NEXT3 = buildNextObjective(NID1, V1, Sets.newHashSet(P1, P2)).addToExisting();
+    private static final NextObjective NEXT4 = buildNextObjective(NID2, V2, Sets.newHashSet(P3, P4)).addToExisting();
+    private static final NextObjective NEXT5 = buildNextObjective(NID1, V1, Sets.newHashSet(P1)).removeFromExisting();
+    private static final NextObjective NEXT6 = buildNextObjective(NID2, V2, Sets.newHashSet(P3)).removeFromExisting();
+    private static final NextObjective NEXT7 = buildNextObjective(NID1, V1, Sets.newHashSet()).remove();
+    private static final NextObjective NEXT8 = buildNextObjective(NID2, V2, Sets.newHashSet()).remove();
+    private List<NextObjective> expectNextObjs = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(NEXT1, NEXT2, NEXT3, NEXT4, NEXT5, NEXT6, NEXT7, NEXT8));
+    private List<NextObjective> expectNextObjsPending = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(NEXT5, NEXT6, NEXT1, NEXT2, NEXT3, NEXT4, NEXT7, NEXT8));
+
+    private static final ForwardingObjective FWD1 = buildFwdObjective(S1, NID1).add();
+    private static final ForwardingObjective FWD2 = buildFwdObjective(S2, NID2).add();
+    private static final ForwardingObjective FWD3 = buildFwdObjective(S1, NID2).add();
+    private static final ForwardingObjective FWD4 = buildFwdObjective(S2, NID1).add();
+    private static final ForwardingObjective FWD5 = buildFwdObjective(S1, NID2).remove();
+    private static final ForwardingObjective FWD6 = buildFwdObjective(S2, NID1).remove();
+    private List<ForwardingObjective> expectFwdObjs = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(FWD1, FWD2, FWD3, FWD4, FWD5, FWD6));
+
+    private List<Objective> actualObjs = Lists.newCopyOnWriteArrayList();
+
+    private Pipeliner pipeliner = new PipelinerAdapter() {
+        @Override
+        public void filter(FilteringObjective filterObjective) {
+            recordObjective(filterObjective);
+        }
+
+        @Override
+        public void forward(ForwardingObjective forwardObjective) {
+            recordObjective(forwardObjective);
+        }
+
+        @Override
+        public void next(NextObjective nextObjective) {
+            recordObjective(nextObjective);
+
+            // Notify delegate when the next obj is completed
+            ObjectiveEvent.Type type;
+            if (nextObjective.op() == Objective.Operation.ADD ||
+                    nextObjective.op() == Objective.Operation.ADD_TO_EXISTING) {
+                type = ObjectiveEvent.Type.ADD;
+            } else if (nextObjective.op() == Objective.Operation.REMOVE ||
+                    nextObjective.op() == Objective.Operation.REMOVE_FROM_EXISTING) {
+                type = ObjectiveEvent.Type.REMOVE;
+            } else {
+                return;
+            }
+            mgr.delegate.notify(new ObjectiveEvent(type, nextObjective.id()));
+        }
+
+        /**
+         * Record the objectives.
+         * The random delay is introduced in order to mimic pipeline and flow operation behavior.
+         *
+         * @param obj Flow objective
+         */
+        private void recordObjective(Objective obj) {
+            try {
+                Thread.sleep(new Random().nextInt(BOUND) + OFFSET);
+                actualObjs.add(obj);
+                obj.context().ifPresent(c -> c.onSuccess(obj));
+            } catch (Exception e) {
+                obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.UNKNOWN));
+            }
+        }
+    };
+
+    @Before
+    public void setUp() {
+        mgr = new InOrderFlowObjectiveManager();
+        mgr.pipeliners.put(DEV1, pipeliner);
+        mgr.executorService = newFixedThreadPool(4, groupedThreads("foo", "bar"));
+        mgr.flowObjectiveStore = createMock(FlowObjectiveStore.class);
+
+        actualObjs.clear();
+    }
+
+    @Test
+    public void filter() throws Exception {
+        expectFiltObjs.forEach(filtObj -> mgr.filter(DEV1, filtObj));
+
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 7;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFiltObjs.size(), actualObjs.size()));
+
+        assertTrue(actualObjs.indexOf(FILT1) < actualObjs.indexOf(FILT2));
+        assertTrue(actualObjs.indexOf(FILT2) < actualObjs.indexOf(FILT3));
+        assertTrue(actualObjs.indexOf(FILT3) < actualObjs.indexOf(FILT5));
+        assertTrue(actualObjs.indexOf(FILT5) < actualObjs.indexOf(FILT7));
+        assertTrue(actualObjs.indexOf(FILT4) < actualObjs.indexOf(FILT6));
+    }
+
+    @Test
+    public void forward() throws Exception {
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
+        replay(mgr.flowObjectiveStore);
+
+        expectFwdObjs.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
+
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 6;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size(), actualObjs.size()));
+
+        assertTrue(actualObjs.indexOf(FWD1) < actualObjs.indexOf(FWD3));
+        assertTrue(actualObjs.indexOf(FWD3) < actualObjs.indexOf(FWD5));
+        assertTrue(actualObjs.indexOf(FWD2) < actualObjs.indexOf(FWD4));
+        assertTrue(actualObjs.indexOf(FWD4) < actualObjs.indexOf(FWD6));
+
+        verify(mgr.flowObjectiveStore);
+    }
+
+    @Test
+    public void forwardPending() throws Exception {
+        // Note: current logic will double check if the next obj need to be queued
+        //       it does not check when resubmitting pending next back to the queue
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(2);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(null).times(2);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
+        replay(mgr.flowObjectiveStore);
+
+        expectFwdObjs.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
+
+        // Trigger the next objectives
+        mgr.next(DEV1, NEXT1);
+        mgr.next(DEV1, NEXT2);
+
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 8;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size() + 2, actualObjs.size()));
+
+        assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(FWD1));
+        assertTrue(actualObjs.indexOf(FWD1) < actualObjs.indexOf(FWD3));
+        assertTrue(actualObjs.indexOf(FWD3) < actualObjs.indexOf(FWD5));
+        assertTrue(actualObjs.indexOf(NEXT2) < actualObjs.indexOf(FWD2));
+        assertTrue(actualObjs.indexOf(FWD2) < actualObjs.indexOf(FWD4));
+        assertTrue(actualObjs.indexOf(FWD4) < actualObjs.indexOf(FWD6));
+
+        verify(mgr.flowObjectiveStore);
+    }
+
+    @Test
+    public void next() throws Exception {
+        // Note: ADD operation won't query this
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
+        replay(mgr.flowObjectiveStore);
+
+        expectNextObjs.forEach(nextObj -> mgr.next(DEV1, nextObj));
+
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 8;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
+
+        assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT3));
+        assertTrue(actualObjs.indexOf(NEXT3) < actualObjs.indexOf(NEXT5));
+        assertTrue(actualObjs.indexOf(NEXT5) < actualObjs.indexOf(NEXT7));
+        assertTrue(actualObjs.indexOf(NEXT2) < actualObjs.indexOf(NEXT4));
+        assertTrue(actualObjs.indexOf(NEXT4) < actualObjs.indexOf(NEXT6));
+        assertTrue(actualObjs.indexOf(NEXT6) < actualObjs.indexOf(NEXT8));
+
+        verify(mgr.flowObjectiveStore);
+    }
+
+    // FIXME We currently do not handle the case when an app sends edit/remove of a next id before add.
+    //       The edit/remove operation will be queued by pendingNext, and the add operation will be
+    //       queued by the ordering queue forever due to the deadlock. This can be improved by making
+    //       pendingForwards, pendingNexts and ordering queue caches.
+    @Test
+    @Ignore("Not supported")
+    public void nextPending() throws Exception {
+        // Note: current logic will double check if the next obj need to be queued
+        //       it does not check when resubmitting pending next back to the queue
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(6);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(null).times(6);
+        replay(mgr.flowObjectiveStore);
+
+        expectNextObjsPending.forEach(nextObj -> mgr.next(DEV1, nextObj));
+
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 8;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
+
+        assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT5));
+        assertTrue(actualObjs.indexOf(NEXT5) < actualObjs.indexOf(NEXT3));
+        assertTrue(actualObjs.indexOf(NEXT3) < actualObjs.indexOf(NEXT7));
+        assertTrue(actualObjs.indexOf(NEXT2) < actualObjs.indexOf(NEXT6));
+        assertTrue(actualObjs.indexOf(NEXT6) < actualObjs.indexOf(NEXT4));
+        assertTrue(actualObjs.indexOf(NEXT4) < actualObjs.indexOf(NEXT8));
+
+        verify(mgr.flowObjectiveStore);
+    }
+
+    /**
+     * Creates filtering objective builder with a serial number encoded in MPLS label.
+     * The serial number is used to identify same objective that occurs multiple times.
+     *
+     * @param portnum Port number
+     * @param vlanId VLAN Id
+     * @param mac MAC address
+     * @param serial Serial number
+     * @return Filtering objective builder
+     */
+    private static FilteringObjective.Builder buildFilteringObjective(PortNumber portnum, VlanId vlanId,
+                                                                      MacAddress mac, int serial) {
+        FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
+        fob.withKey(Criteria.matchInPort(portnum))
+                .addCondition(Criteria.matchEthDst(mac))
+                .addCondition(Criteria.matchVlanId(VlanId.NONE))
+                .addCondition(Criteria.matchMplsLabel(MplsLabel.mplsLabel(serial)))
+                .withPriority(PRIORITY);
+
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+        tBuilder.pushVlan().setVlanId(vlanId);
+        fob.withMeta(tBuilder.build());
+
+        fob.permit().fromApp(APP_ID);
+        return fob;
+    }
+
+    /**
+     * Creates next objective builder.
+     *
+     * @param nextId next ID
+     * @param vlanId VLAN ID
+     * @param ports Set of ports that is in the given VLAN ID
+     *
+     * @return Next objective builder
+     */
+    private static NextObjective.Builder buildNextObjective(int nextId, VlanId vlanId, Collection<PortNumber> ports) {
+        TrafficSelector metadata =
+                DefaultTrafficSelector.builder().matchVlanId(vlanId).build();
+
+        NextObjective.Builder nextObjBuilder = DefaultNextObjective
+                .builder().withId(nextId)
+                .withType(NextObjective.Type.BROADCAST).fromApp(APP_ID)
+                .withMeta(metadata);
+
+        ports.forEach(port -> {
+            TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+            tBuilder.popVlan();
+            tBuilder.setOutput(port);
+            nextObjBuilder.addTreatment(tBuilder.build());
+        });
+
+        return nextObjBuilder;
+    }
+
+    /**
+     * Creates forwarding objective builder.
+     *
+     * @param selector Traffic selector
+     * @param nextId next ID
+     * @return Forwarding objective builder
+     */
+    private static ForwardingObjective.Builder buildFwdObjective(TrafficSelector selector, int nextId) {
+        return DefaultForwardingObjective.builder()
+                .makePermanent()
+                .withSelector(selector)
+                .nextStep(nextId)
+                .fromApp(APP_ID)
+                .withPriority(PRIORITY)
+                .withFlag(ForwardingObjective.Flag.SPECIFIC);
+    }
+}
\ No newline at end of file