Implements in-order flow objective execution

Change-Id: I6d11e25bc44c07ef8488e51b63c3ff8a88d98f9c
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/
new file mode 100644
index 0000000..bb89ced
--- /dev/null
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/
@@ -0,0 +1,373 @@
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.MplsLabel;
+import org.onlab.packet.VlanId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.DefaultApplicationId;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.onlab.junit.TestTools.assertAfter;
+import static org.onlab.util.Tools.groupedThreads;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+public class InOrderFlowObjectiveManagerTest {
+    private InOrderFlowObjectiveManager mgr;
+    private static final int PRIORITY = 1000;
+    private static final ApplicationId APP_ID = new DefaultApplicationId(1, "org.onosproject.test");
+    private static final DeviceId DEV1 = DeviceId.deviceId("of:1");
+    private static final PortNumber P1 = PortNumber.portNumber(1);
+    private static final PortNumber P2 = PortNumber.portNumber(2);
+    private static final PortNumber P3 = PortNumber.portNumber(3);
+    private static final PortNumber P4 = PortNumber.portNumber(4);
+    private static final MacAddress M1 = MacAddress.valueOf("00:00:00:00:00:01");
+    private static final MacAddress M2 = MacAddress.valueOf("00:00:00:00:00:02");
+    private static final MacAddress M3 = MacAddress.valueOf("00:00:00:00:00:03");
+    private static final VlanId V1 = VlanId.vlanId((short) 10);
+    private static final VlanId V2 = VlanId.vlanId((short) 20);
+    private static final VlanId V3 = VlanId.vlanId((short) 30);
+    private static final TrafficSelector S1 = DefaultTrafficSelector.builder()
+            .matchEthType(Ethernet.TYPE_IPV4).matchIPDst(IpPrefix.valueOf("")).build();
+    private static final TrafficSelector S2 = DefaultTrafficSelector.builder()
+            .matchEthType(Ethernet.TYPE_IPV4).matchIPDst(IpPrefix.valueOf("")).build();
+    private static final int NID1 = 1;
+    private static final int NID2 = 2;
+    private static final NextGroup NGRP1 = () -> new byte[] {0x00, 0x01};
+    private static final NextGroup NGRP2 = () -> new byte[] {0x02, 0x03};
+    // Delay flow objectives OFFSET + rand(0, BOUND) millis
+    private static final int OFFSET = 10; // ms
+    private static final int BOUND = 40; // ms
+    private static final FilteringObjective FILT1 = buildFilteringObjective(P2, V3, M3, 1).add();
+    private static final FilteringObjective FILT2 = buildFilteringObjective(P2, V2, M2, 2).add();
+    private static final FilteringObjective FILT3 = buildFilteringObjective(P2, V3, M3, 3).remove();
+    private static final FilteringObjective FILT4 = buildFilteringObjective(P1, V1, M1, 4).add();
+    private static final FilteringObjective FILT5 = buildFilteringObjective(P2, V2, M2, 5).remove();
+    private static final FilteringObjective FILT6 = buildFilteringObjective(P1, V1, M1, 6).remove();
+    private static final FilteringObjective FILT7 = buildFilteringObjective(P2, V3, M3, 7).add();
+    private List<FilteringObjective> expectFiltObjs = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(FILT1, FILT2, FILT3, FILT4, FILT5, FILT6, FILT7));
+    private static final NextObjective NEXT1 = buildNextObjective(NID1, V1, Sets.newHashSet(P1)).add();
+    private static final NextObjective NEXT2 = buildNextObjective(NID2, V2, Sets.newHashSet(P3)).add();
+    private static final NextObjective NEXT3 = buildNextObjective(NID1, V1, Sets.newHashSet(P1, P2)).addToExisting();
+    private static final NextObjective NEXT4 = buildNextObjective(NID2, V2, Sets.newHashSet(P3, P4)).addToExisting();
+    private static final NextObjective NEXT5 = buildNextObjective(NID1, V1, Sets.newHashSet(P1)).removeFromExisting();
+    private static final NextObjective NEXT6 = buildNextObjective(NID2, V2, Sets.newHashSet(P3)).removeFromExisting();
+    private static final NextObjective NEXT7 = buildNextObjective(NID1, V1, Sets.newHashSet()).remove();
+    private static final NextObjective NEXT8 = buildNextObjective(NID2, V2, Sets.newHashSet()).remove();
+    private List<NextObjective> expectNextObjs = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(NEXT1, NEXT2, NEXT3, NEXT4, NEXT5, NEXT6, NEXT7, NEXT8));
+    private List<NextObjective> expectNextObjsPending = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(NEXT5, NEXT6, NEXT1, NEXT2, NEXT3, NEXT4, NEXT7, NEXT8));
+    private static final ForwardingObjective FWD1 = buildFwdObjective(S1, NID1).add();
+    private static final ForwardingObjective FWD2 = buildFwdObjective(S2, NID2).add();
+    private static final ForwardingObjective FWD3 = buildFwdObjective(S1, NID2).add();
+    private static final ForwardingObjective FWD4 = buildFwdObjective(S2, NID1).add();
+    private static final ForwardingObjective FWD5 = buildFwdObjective(S1, NID2).remove();
+    private static final ForwardingObjective FWD6 = buildFwdObjective(S2, NID1).remove();
+    private List<ForwardingObjective> expectFwdObjs = Lists.newCopyOnWriteArrayList(
+            Lists.newArrayList(FWD1, FWD2, FWD3, FWD4, FWD5, FWD6));
+    private List<Objective> actualObjs = Lists.newCopyOnWriteArrayList();
+    private Pipeliner pipeliner = new PipelinerAdapter() {
+        @Override
+        public void filter(FilteringObjective filterObjective) {
+            recordObjective(filterObjective);
+        }
+        @Override
+        public void forward(ForwardingObjective forwardObjective) {
+            recordObjective(forwardObjective);
+        }
+        @Override
+        public void next(NextObjective nextObjective) {
+            recordObjective(nextObjective);
+            // Notify delegate when the next obj is completed
+            ObjectiveEvent.Type type;
+            if (nextObjective.op() == Objective.Operation.ADD ||
+                    nextObjective.op() == Objective.Operation.ADD_TO_EXISTING) {
+                type = ObjectiveEvent.Type.ADD;
+            } else if (nextObjective.op() == Objective.Operation.REMOVE ||
+                    nextObjective.op() == Objective.Operation.REMOVE_FROM_EXISTING) {
+                type = ObjectiveEvent.Type.REMOVE;
+            } else {
+                return;
+            }
+            mgr.delegate.notify(new ObjectiveEvent(type,;
+        }
+        /**
+         * Record the objectives.
+         * The random delay is introduced in order to mimic pipeline and flow operation behavior.
+         *
+         * @param obj Flow objective
+         */
+        private void recordObjective(Objective obj) {
+            try {
+                Thread.sleep(new Random().nextInt(BOUND) + OFFSET);
+                actualObjs.add(obj);
+                obj.context().ifPresent(c -> c.onSuccess(obj));
+            } catch (Exception e) {
+                obj.context().ifPresent(c -> c.onError(obj, ObjectiveError.UNKNOWN));
+            }
+        }
+    };
+    @Before
+    public void setUp() {
+        mgr = new InOrderFlowObjectiveManager();
+        mgr.pipeliners.put(DEV1, pipeliner);
+        mgr.executorService = newFixedThreadPool(4, groupedThreads("foo", "bar"));
+        mgr.flowObjectiveStore = createMock(FlowObjectiveStore.class);
+        actualObjs.clear();
+    }
+    @Test
+    public void filter() throws Exception {
+        expectFiltObjs.forEach(filtObj -> mgr.filter(DEV1, filtObj));
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 7;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFiltObjs.size(), actualObjs.size()));
+        assertTrue(actualObjs.indexOf(FILT1) < actualObjs.indexOf(FILT2));
+        assertTrue(actualObjs.indexOf(FILT2) < actualObjs.indexOf(FILT3));
+        assertTrue(actualObjs.indexOf(FILT3) < actualObjs.indexOf(FILT5));
+        assertTrue(actualObjs.indexOf(FILT5) < actualObjs.indexOf(FILT7));
+        assertTrue(actualObjs.indexOf(FILT4) < actualObjs.indexOf(FILT6));
+    }
+    @Test
+    public void forward() throws Exception {
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
+        replay(mgr.flowObjectiveStore);
+        expectFwdObjs.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 6;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size(), actualObjs.size()));
+        assertTrue(actualObjs.indexOf(FWD1) < actualObjs.indexOf(FWD3));
+        assertTrue(actualObjs.indexOf(FWD3) < actualObjs.indexOf(FWD5));
+        assertTrue(actualObjs.indexOf(FWD2) < actualObjs.indexOf(FWD4));
+        assertTrue(actualObjs.indexOf(FWD4) < actualObjs.indexOf(FWD6));
+        verify(mgr.flowObjectiveStore);
+    }
+    @Test
+    public void forwardPending() throws Exception {
+        // Note: current logic will double check if the next obj need to be queued
+        //       it does not check when resubmitting pending next back to the queue
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(2);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(null).times(2);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
+        replay(mgr.flowObjectiveStore);
+        expectFwdObjs.forEach(fwdObj -> mgr.forward(DEV1, fwdObj));
+        // Trigger the next objectives
+, NEXT1);
+, NEXT2);
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 8;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectFwdObjs.size() + 2, actualObjs.size()));
+        assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(FWD1));
+        assertTrue(actualObjs.indexOf(FWD1) < actualObjs.indexOf(FWD3));
+        assertTrue(actualObjs.indexOf(FWD3) < actualObjs.indexOf(FWD5));
+        assertTrue(actualObjs.indexOf(NEXT2) < actualObjs.indexOf(FWD2));
+        assertTrue(actualObjs.indexOf(FWD2) < actualObjs.indexOf(FWD4));
+        assertTrue(actualObjs.indexOf(FWD4) < actualObjs.indexOf(FWD6));
+        verify(mgr.flowObjectiveStore);
+    }
+    @Test
+    public void next() throws Exception {
+        // Note: ADD operation won't query this
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(NGRP1).times(3);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(NGRP2).times(3);
+        replay(mgr.flowObjectiveStore);
+        expectNextObjs.forEach(nextObj ->, nextObj));
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 8;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
+        assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT3));
+        assertTrue(actualObjs.indexOf(NEXT3) < actualObjs.indexOf(NEXT5));
+        assertTrue(actualObjs.indexOf(NEXT5) < actualObjs.indexOf(NEXT7));
+        assertTrue(actualObjs.indexOf(NEXT2) < actualObjs.indexOf(NEXT4));
+        assertTrue(actualObjs.indexOf(NEXT4) < actualObjs.indexOf(NEXT6));
+        assertTrue(actualObjs.indexOf(NEXT6) < actualObjs.indexOf(NEXT8));
+        verify(mgr.flowObjectiveStore);
+    }
+    // FIXME We currently do not handle the case when an app sends edit/remove of a next id before add.
+    //       The edit/remove operation will be queued by pendingNext, and the add operation will be
+    //       queued by the ordering queue forever due to the deadlock. This can be improved by making
+    //       pendingForwards, pendingNexts and ordering queue caches.
+    @Test
+    @Ignore("Not supported")
+    public void nextPending() throws Exception {
+        // Note: current logic will double check if the next obj need to be queued
+        //       it does not check when resubmitting pending next back to the queue
+        expect(mgr.flowObjectiveStore.getNextGroup(NID1)).andReturn(null).times(6);
+        expect(mgr.flowObjectiveStore.getNextGroup(NID2)).andReturn(null).times(6);
+        replay(mgr.flowObjectiveStore);
+        expectNextObjsPending.forEach(nextObj ->, nextObj));
+        // Wait for the pipeline operation to complete
+        int expectedTime = (BOUND + OFFSET) * 8;
+        assertAfter(expectedTime, expectedTime * 5, () -> assertEquals(expectNextObjs.size(), actualObjs.size()));
+        assertTrue(actualObjs.indexOf(NEXT1) < actualObjs.indexOf(NEXT5));
+        assertTrue(actualObjs.indexOf(NEXT5) < actualObjs.indexOf(NEXT3));
+        assertTrue(actualObjs.indexOf(NEXT3) < actualObjs.indexOf(NEXT7));
+        assertTrue(actualObjs.indexOf(NEXT2) < actualObjs.indexOf(NEXT6));
+        assertTrue(actualObjs.indexOf(NEXT6) < actualObjs.indexOf(NEXT4));
+        assertTrue(actualObjs.indexOf(NEXT4) < actualObjs.indexOf(NEXT8));
+        verify(mgr.flowObjectiveStore);
+    }
+    /**
+     * Creates filtering objective builder with a serial number encoded in MPLS label.
+     * The serial number is used to identify same objective that occurs multiple times.
+     *
+     * @param portnum Port number
+     * @param vlanId VLAN Id
+     * @param mac MAC address
+     * @param serial Serial number
+     * @return Filtering objective builder
+     */
+    private static FilteringObjective.Builder buildFilteringObjective(PortNumber portnum, VlanId vlanId,
+                                                                      MacAddress mac, int serial) {
+        FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
+        fob.withKey(Criteria.matchInPort(portnum))
+                .addCondition(Criteria.matchEthDst(mac))
+                .addCondition(Criteria.matchVlanId(VlanId.NONE))
+                .addCondition(Criteria.matchMplsLabel(MplsLabel.mplsLabel(serial)))
+                .withPriority(PRIORITY);
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+        tBuilder.pushVlan().setVlanId(vlanId);
+        fob.withMeta(;
+        fob.permit().fromApp(APP_ID);
+        return fob;
+    }
+    /**
+     * Creates next objective builder.
+     *
+     * @param nextId next ID
+     * @param vlanId VLAN ID
+     * @param ports Set of ports that is in the given VLAN ID
+     *
+     * @return Next objective builder
+     */
+    private static NextObjective.Builder buildNextObjective(int nextId, VlanId vlanId, Collection<PortNumber> ports) {
+        TrafficSelector metadata =
+                DefaultTrafficSelector.builder().matchVlanId(vlanId).build();
+        NextObjective.Builder nextObjBuilder = DefaultNextObjective
+                .builder().withId(nextId)
+                .withType(NextObjective.Type.BROADCAST).fromApp(APP_ID)
+                .withMeta(metadata);
+        ports.forEach(port -> {
+            TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+            tBuilder.popVlan();
+            tBuilder.setOutput(port);
+            nextObjBuilder.addTreatment(;
+        });
+        return nextObjBuilder;
+    }
+    /**
+     * Creates forwarding objective builder.
+     *
+     * @param selector Traffic selector
+     * @param nextId next ID
+     * @return Forwarding objective builder
+     */
+    private static ForwardingObjective.Builder buildFwdObjective(TrafficSelector selector, int nextId) {
+        return DefaultForwardingObjective.builder()
+                .makePermanent()
+                .withSelector(selector)
+                .nextStep(nextId)
+                .fromApp(APP_ID)
+                .withPriority(PRIORITY)
+                .withFlag(ForwardingObjective.Flag.SPECIFIC);
+    }
\ No newline at end of file