Add an executor for flow batch operation.

- Added FlowBatchOperationExecutor to execute flow batch operations.
- Modified FlowManagerModule to use FlowBatchOperationExecutor and removed unused codes.
- This task is a part of ONOS-1690 and ONOS-1692.

Change-Id: I84737ab43944785dd815effa8e455c5a3df55850
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchOperationExecutor.java b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchOperationExecutor.java
new file mode 100644
index 0000000..d764014
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowmanager/FlowBatchOperationExecutor.java
@@ -0,0 +1,144 @@
+package net.onrc.onos.core.flowmanager;
+
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+
+import net.onrc.onos.api.batchoperation.BatchOperationEntry;
+import net.onrc.onos.api.flowmanager.Flow;
+import net.onrc.onos.api.flowmanager.FlowBatchId;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation;
+import net.onrc.onos.api.flowmanager.FlowBatchOperation.Operator;
+import net.onrc.onos.api.flowmanager.FlowBatchState;
+import net.onrc.onos.core.matchaction.MatchActionId;
+import net.onrc.onos.core.matchaction.MatchActionOperationEntry;
+import net.onrc.onos.core.matchaction.MatchActionOperations;
+import net.onrc.onos.core.matchaction.MatchActionOperationsId;
+import net.onrc.onos.core.matchaction.MatchActionService;
+import net.onrc.onos.core.util.IdGenerator;
+
+/**
+ * Executes {@link FlowBatchOperation}.
+ * <p>
+ * This class compiles pairs of {@link Flow} and ADD/REMOVE operation to a list
+ * of a set of match-action operations.
+ */
+class FlowBatchOperationExecutor implements FlowBatchMapEventListener {
+    private final FlowBatchMap flowBatchMap;
+    // TODO: ADD later: private final FlowMap flowMap;
+    private final MatchActionService matchActionService;
+    private IdGenerator<MatchActionId> maIdGenerator;
+    private IdGenerator<MatchActionOperationsId> maoIdGenerator;
+
+    /**
+     * Creates {@link FlowBatchOperationExecutor} instance.
+     */
+    public FlowBatchOperationExecutor(MatchActionService matchActionService,
+            FlowMap flowMap, FlowBatchMap flowBatchMap) {
+        this.flowBatchMap = flowBatchMap;
+        // TODO: Add later: this.flowMap = flowMap;
+        this.matchActionService = matchActionService;
+        this.maIdGenerator = matchActionService.getMatchActionIdGenerator();
+        this.maoIdGenerator = matchActionService.getMatchActionOperationsIdGenerator();
+    }
+
+    /**
+     * Generates the series of MatchActionOperations from the
+     * {@link FlowBatchOperation}.
+     * <p>
+     * FIXME: Currently supporting ADD operations only.
+     * <p>
+     * FIXME: Currently supporting PacketPathFlow and SingleDstTreeFlow only.
+     * <p>
+     * FIXME: MatchActionOperations should have dependency field to the other
+     * match action operations, and this method should use this.
+     *
+     * @param op the {@link FlowBatchOperation} object
+     * @return the list of {@link MatchActionOperations} objects
+     */
+    private List<MatchActionOperations>
+            generateMatchActionOperationsList(FlowBatchOperation op) {
+
+        // MatchAction operations at head (ingress) switches.
+        MatchActionOperations headOps =
+                new MatchActionOperations(maoIdGenerator.getNewId());
+
+        // MatchAction operations at rest of the switches.
+        MatchActionOperations tailOps =
+                new MatchActionOperations(maoIdGenerator.getNewId());
+
+        for (BatchOperationEntry<Operator, ?> e : op.getOperations()) {
+
+            // Check if it includes unsupported operations
+            if (e.getOperator() != FlowBatchOperation.Operator.ADD) {
+                throw new UnsupportedOperationException(
+                        "FlowManager supports ADD operations only.");
+            }
+            if (!(e.getTarget() instanceof Flow)) {
+                throw new IllegalStateException(
+                        "The target is not Flow object: " + e.getTarget());
+            }
+
+            // Compile flows to match-actions
+            Flow flow = (Flow) e.getTarget();
+            List<MatchActionOperations> maOps = flow.compile(
+                    e.getOperator(), maIdGenerator, maoIdGenerator);
+            verifyNotNull(maOps, "Could not compile the flow: " + flow);
+            verify(maOps.size() == 2,
+                    "The flow generates unspported match-action operations.");
+
+            // Merge match-action operations
+            for (MatchActionOperationEntry mae : maOps.get(0).getOperations()) {
+                verify(mae.getOperator() == MatchActionOperations.Operator.ADD);
+                tailOps.addOperation(mae);
+            }
+            for (MatchActionOperationEntry mae : maOps.get(1).getOperations()) {
+                verify(mae.getOperator() == MatchActionOperations.Operator.ADD);
+                headOps.addOperation(mae);
+            }
+        }
+
+        return Arrays.asList(tailOps, headOps);
+    }
+
+    @Override
+    public void flowBatchOperationAdded(FlowBatchId id, FlowBatchOperation flowOp) {
+
+        if (flowBatchMap.isLocal(id)) {
+            // TODO: update flowMap based on flowOp.
+
+            List<MatchActionOperations> maOps = generateMatchActionOperationsList(flowOp);
+            for (MatchActionOperations maOp : maOps) {
+                matchActionService.executeOperations(maOp);
+            }
+        }
+    }
+
+    @Override
+    public void flowBatchOperationRemoved(FlowBatchId id) {
+        // not used.
+    }
+
+    @Override
+    public void flowBatchOperationStateChanged(FlowBatchId id, FlowBatchState oldState,
+            FlowBatchState currentState) {
+        // TODO: update flow states in flowMap.
+        // TODO: update flow batch operation states in flowBatchMap.
+    }
+
+    /**
+     * Starts executor.
+     */
+    public void start() {
+        flowBatchMap.addListener(this);
+    }
+
+    /**
+     * Stops executor.
+     */
+    public void stop() {
+        flowBatchMap.removeListener(this);
+    }
+}
diff --git a/src/main/java/net/onrc/onos/core/flowmanager/FlowManagerModule.java b/src/main/java/net/onrc/onos/core/flowmanager/FlowManagerModule.java
index 14c63df..83092eb 100644
--- a/src/main/java/net/onrc/onos/core/flowmanager/FlowManagerModule.java
+++ b/src/main/java/net/onrc/onos/core/flowmanager/FlowManagerModule.java
@@ -1,8 +1,5 @@
 package net.onrc.onos.core.flowmanager;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -14,22 +11,16 @@
 import net.floodlightcontroller.core.module.FloodlightModuleException;
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.api.batchoperation.BatchOperationEntry;
 import net.onrc.onos.api.flowmanager.ConflictDetectionPolicy;
 import net.onrc.onos.api.flowmanager.Flow;
 import net.onrc.onos.api.flowmanager.FlowBatchHandle;
 import net.onrc.onos.api.flowmanager.FlowBatchId;
 import net.onrc.onos.api.flowmanager.FlowBatchOperation;
-import net.onrc.onos.api.flowmanager.FlowBatchOperation.Operator;
 import net.onrc.onos.api.flowmanager.FlowId;
 import net.onrc.onos.api.flowmanager.FlowManagerFloodlightService;
 import net.onrc.onos.api.flowmanager.FlowManagerListener;
 import net.onrc.onos.core.datagrid.ISharedCollectionsService;
 import net.onrc.onos.core.matchaction.MatchActionFloodlightService;
-import net.onrc.onos.core.matchaction.MatchActionId;
-import net.onrc.onos.core.matchaction.MatchActionOperationEntry;
-import net.onrc.onos.core.matchaction.MatchActionOperations;
-import net.onrc.onos.core.matchaction.MatchActionOperationsId;
 import net.onrc.onos.core.matchaction.MatchActionService;
 import net.onrc.onos.core.registry.IControllerRegistryService;
 import net.onrc.onos.core.util.IdBlockAllocator;
@@ -45,14 +36,13 @@
     private ConflictDetectionPolicy conflictDetectionPolicy;
     private FlowIdGeneratorWithIdBlockAllocator flowIdGenerator;
     private FlowBatchIdGeneratorWithIdBlockAllocator flowBatchIdGenerator;
-    private IdGenerator<MatchActionId> maIdGenerator;
-    private IdGenerator<MatchActionOperationsId> maoIdGenerator;
     private MatchActionService matchActionService;
     private IControllerRegistryService registryService;
     private ISharedCollectionsService sharedCollectionService;
     private FlowMap flowMap;
     private FlowBatchMap flowBatchMap;
     private FlowEventDispatcher flowEventDispatcher;
+    private FlowBatchOperationExecutor flowBatchOperationExecutor;
 
     @Override
     public Collection<Class<? extends IFloodlightService>> getModuleServices() {
@@ -92,14 +82,15 @@
                 new FlowIdGeneratorWithIdBlockAllocator(idBlockAllocator);
         flowBatchIdGenerator =
                 new FlowBatchIdGeneratorWithIdBlockAllocator(idBlockAllocator);
-        maIdGenerator = matchActionService.getMatchActionIdGenerator();
-        maoIdGenerator = matchActionService.getMatchActionOperationsIdGenerator();
 
         flowMap = new SharedFlowMap(sharedCollectionService);
         flowBatchMap = new SharedFlowBatchMap(sharedCollectionService);
         flowEventDispatcher =
                 new FlowEventDispatcher(flowMap, flowBatchMap, matchActionService);
         flowEventDispatcher.start();
+        flowBatchOperationExecutor =
+                new FlowBatchOperationExecutor(matchActionService, flowMap, flowBatchMap);
+        flowBatchOperationExecutor.start();
     }
 
     /**
@@ -183,55 +174,4 @@
     public void removeListener(FlowManagerListener listener) {
         flowEventDispatcher.removeListener(listener);
     }
-
-    private MatchActionOperations createNewMatchActionOperations() {
-        return new MatchActionOperations(maoIdGenerator.getNewId());
-    }
-
-    /**
-     * Generates the series of MatchActionOperations from the
-     * {@link FlowBatchOperation}.
-     * <p>
-     * Note: Currently supporting ADD operations only.
-     * <p>
-     * Note: Currently supporting PacketPathFlow and SingleDstTreeFlow only.
-     *
-     * @param op the {@link FlowBatchOperation} object
-     * @return the list of {@link MatchActionOperations} objects
-     */
-    private List<MatchActionOperations>
-            generateMatchActionOperationsList(FlowBatchOperation op) {
-        MatchActionOperations firstOps = createNewMatchActionOperations();
-        MatchActionOperations secondOps = createNewMatchActionOperations();
-
-        for (BatchOperationEntry<Operator, ?> e : op.getOperations()) {
-            if (e.getOperator() != FlowBatchOperation.Operator.ADD) {
-                throw new UnsupportedOperationException(
-                        "FlowManager supports ADD operations only.");
-            }
-            if (!(e.getTarget() instanceof Flow)) {
-                throw new IllegalStateException(
-                        "The target is not Flow object: " + e.getTarget());
-            }
-
-            Flow flow = (Flow) e.getTarget();
-            List<MatchActionOperations> maOps = flow.compile(
-                    e.getOperator(), maIdGenerator, maoIdGenerator);
-            checkNotNull(maOps, "Could not compile the flow: " + flow);
-            checkState(maOps.size() == 2,
-                    "The flow generates unspported match-action operations.");
-
-            for (MatchActionOperationEntry mae : maOps.get(0).getOperations()) {
-                checkState(mae.getOperator() == MatchActionOperations.Operator.ADD);
-                firstOps.addOperation(mae);
-            }
-
-            for (MatchActionOperationEntry mae : maOps.get(1).getOperations()) {
-                checkState(mae.getOperator() == MatchActionOperations.Operator.ADD);
-                secondOps.addOperation(mae);
-            }
-        }
-
-        return Arrays.asList(firstOps, secondOps);
-    }
 }