Implementation of new Flow Subsystem:

The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.

Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9

more flowservice improvements

Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f

more flowservice impl

Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f

Manager to store functional (at least i believe it)

Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53

flow subsystem functional: need to fix unit tests

Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0

flow subsystem functional

Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d

finished refactor of flow subsystem

Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804

fix for null flow provider to use new api

Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 6d6cd9a..158764f 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -21,7 +21,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-
+import com.google.common.util.concurrent.Futures;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -29,22 +29,25 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
 import org.onosproject.event.AbstractListenerRegistry;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
 import org.onosproject.net.flow.FlowRuleBatchEvent;
 import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.FlowRuleBatchRequest;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
 import org.onosproject.net.flow.FlowRuleProvider;
 import org.onosproject.net.flow.FlowRuleProviderRegistry;
 import org.onosproject.net.flow.FlowRuleProviderService;
@@ -55,18 +58,16 @@
 import org.onosproject.net.provider.AbstractProviderService;
 import org.slf4j.Logger;
 
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -90,7 +91,16 @@
 
     private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
 
-    private ExecutorService futureService;
+    protected ExecutorService deviceInstallers =
+            Executors.newCachedThreadPool(namedThreads("onos-device-installer-%d"));
+
+    protected ExecutorService operationsService =
+            Executors.newFixedThreadPool(32, namedThreads("onos-flowservice-operations-%d"));
+
+    private IdGenerator idGenerator;
+
+    private Map<Long, FlowOperationsProcessor> pendingFlowOperations = new
+            ConcurrentHashMap<>();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleStore store;
@@ -101,10 +111,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
     @Activate
     public void activate() {
-        futureService =
-                Executors.newFixedThreadPool(32, namedThreads("onos-provider-future-listeners-%d"));
+
+        idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+
+
         store.setDelegate(delegate);
         eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
         log.info("Started");
@@ -112,8 +127,8 @@
 
     @Deactivate
     public void deactivate() {
-        futureService.shutdownNow();
-
+        deviceInstallers.shutdownNow();
+        operationsService.shutdownNow();
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(FlowRuleEvent.class);
         log.info("Stopped");
@@ -131,20 +146,20 @@
 
     @Override
     public void applyFlowRules(FlowRule... flowRules) {
-        Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
         for (int i = 0; i < flowRules.length; i++) {
-            toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
+            builder.add(flowRules[i]);
         }
-        applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
+        apply(builder.build());
     }
 
     @Override
     public void removeFlowRules(FlowRule... flowRules) {
-        Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
         for (int i = 0; i < flowRules.length; i++) {
-            toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
+            builder.remove(flowRules[i]);
         }
-        applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
+        apply(builder.build());
     }
 
     @Override
@@ -180,23 +195,38 @@
     }
 
     @Override
-    public Future<CompletedBatchOperation> applyBatch(
-            FlowRuleBatchOperation batch) {
-        Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
-                ArrayListMultimap.create();
-        List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
-        for (FlowRuleBatchEntry fbe : batch.getOperations()) {
-            final FlowRule f = fbe.target();
-            perDeviceBatches.put(f.deviceId(), fbe);
-        }
+    public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
 
-        for (DeviceId deviceId : perDeviceBatches.keySet()) {
-            FlowRuleBatchOperation b =
-                    new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
-            Future<CompletedBatchOperation> future = store.storeBatch(b);
-            futures.add(future);
-        }
-        return new FlowRuleBatchFuture(futures, perDeviceBatches);
+
+        FlowRuleOperations.Builder fopsBuilder = FlowRuleOperations.builder();
+        batch.getOperations().stream().forEach(op -> {
+                        switch (op.getOperator()) {
+                            case ADD:
+                                fopsBuilder.add(op.getTarget());
+                                break;
+                            case REMOVE:
+                                fopsBuilder.remove(op.getTarget());
+                                break;
+                            case MODIFY:
+                                fopsBuilder.modify(op.getTarget());
+                                break;
+                            default:
+                                log.warn("Unknown flow operation operator: {}", op.getOperator());
+
+                        }
+                }
+        );
+
+        apply(fopsBuilder.build());
+        return Futures.immediateFuture(
+                new CompletedBatchOperation(true,
+                                            Collections.emptySet(), null));
+
+    }
+
+    @Override
+    public void apply(FlowRuleOperations ops) {
+        operationsService.submit(new FlowOperationsProcessor(ops));
     }
 
     @Override
@@ -373,13 +403,19 @@
 
             }
         }
+
+        @Override
+        public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
+            store.batchOperationComplete(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(batchId, Collections.emptySet()),
+                    operation
+            ));
+        }
     }
 
     // Store delegate to re-post events emitted from the store.
     private class InternalStoreDelegate implements FlowRuleStoreDelegate {
 
-        // FIXME set appropriate default and make it configurable
-        private static final int TIMEOUT_PER_OP = 500; // ms
 
         // TODO: Right now we only dispatch events at individual flowEntry level.
         // It may be more efficient for also dispatch events as a batch.
@@ -389,47 +425,55 @@
             switch (event.type()) {
             case BATCH_OPERATION_REQUESTED:
                 // Request has been forwarded to MASTER Node, and was
-                for (FlowRule entry : request.toAdd()) {
-                    eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
-                }
-                for (FlowRule entry : request.toRemove()) {
-                    eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
-                }
-                // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
+                request.ops().stream().forEach(
+                        op -> {
+                            switch (op.getOperator()) {
 
-                FlowRuleBatchOperation batchOperation = request.asBatchOperation();
+                                case ADD:
+                                    eventDispatcher.post(
+                                            new FlowRuleEvent(
+                                                    FlowRuleEvent.Type.RULE_ADD_REQUESTED,
+                                                    op.getTarget()));
+                                    break;
+                                case REMOVE:
+                                    eventDispatcher.post(
+                                            new FlowRuleEvent(
+                                                    FlowRuleEvent.Type.RULE_REMOVE_REQUESTED,
+                                                    op.getTarget()));
+                                    break;
+                                case MODIFY:
+                                    //TODO: do something here when the time comes.
+                                    break;
+                                default:
+                                    log.warn("Unknown flow operation operator: {}", op.getOperator());
+                            }
+                        }
+                );
+
+                DeviceId deviceId = event.deviceId();
+
+                FlowRuleBatchOperation batchOperation =
+                        request.asBatchOperation(deviceId);
 
                 FlowRuleProvider flowRuleProvider =
-                        getProvider(batchOperation.getOperations().get(0).target().deviceId());
-                final Future<CompletedBatchOperation> result =
-                        flowRuleProvider.executeBatch(batchOperation);
-                futureService.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        CompletedBatchOperation res;
-                        try {
-                            res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
-                            store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
-                        } catch (TimeoutException | InterruptedException | ExecutionException e) {
-                            log.warn("Something went wrong with the batch operation {}",
-                                     request.batchId(), e);
+                        getProvider(deviceId);
 
-                            Set<FlowRule> failures = new HashSet<>(batchOperation.size());
-                            for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
-                                failures.add(op.target());
-                            }
-                            res = new CompletedBatchOperation(false, failures);
-                            store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
-                        }
-                    }
-                });
+                flowRuleProvider.executeBatch(batchOperation);
+
                 break;
 
             case BATCH_OPERATION_COMPLETED:
-                // MASTER Node has pushed the batch down to the Device
 
-                // Note: RULE_ADDED will be posted
-                // when Flow was actually confirmed by stats reply.
+                FlowOperationsProcessor fops = pendingFlowOperations.remove(
+                        event.subject().batchId());
+                if (event.result().isSuccess()) {
+                    if (fops != null) {
+                        fops.satisfy(event.deviceId());
+                    }
+                } else {
+                    fops.fail(event.deviceId(), event.result().failedItems());
+                }
+
                 break;
 
             default:
@@ -438,141 +482,100 @@
         }
     }
 
-    private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
+    private class FlowOperationsProcessor implements Runnable {
 
-        private final List<Future<CompletedBatchOperation>> futures;
-        private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
-        private final AtomicReference<BatchState> state;
-        private CompletedBatchOperation overall;
+        private final List<Set<FlowRuleOperation>> stages;
+        private final FlowRuleOperationsContext context;
+        private final FlowRuleOperations fops;
+        private final AtomicBoolean hasFailed = new AtomicBoolean(false);
 
-        public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
-                Multimap<DeviceId, FlowRuleBatchEntry> batches) {
-            this.futures = futures;
-            this.batches = batches;
-            this.state = new AtomicReference<>(BatchState.STARTED);
-        }
+        private Set<DeviceId> pendingDevices;
 
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            if (state.get() == BatchState.FINISHED) {
-                return false;
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Cancelling FlowRuleBatchFuture",
-                          new RuntimeException("Just printing backtrace"));
-            }
-            if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
-                return false;
-            }
-            cleanUpBatch();
-            for (Future<CompletedBatchOperation> f : futures) {
-                f.cancel(mayInterruptIfRunning);
-            }
-            return true;
-        }
+        public FlowOperationsProcessor(FlowRuleOperations ops) {
 
-        @Override
-        public boolean isCancelled() {
-            return state.get() == BatchState.CANCELLED;
-        }
+            this.stages = Lists.newArrayList(ops.stages());
+            this.context = ops.callback();
+            this.fops = ops;
+            pendingDevices = Sets.newConcurrentHashSet();
 
-        @Override
-        public boolean isDone() {
-            return state.get() == BatchState.FINISHED;
-        }
-
-
-        @Override
-        public CompletedBatchOperation get() throws InterruptedException,
-            ExecutionException {
-
-            if (isDone()) {
-                return overall;
-            }
-
-            boolean success = true;
-            Set<FlowRule> failed = Sets.newHashSet();
-            Set<Long> failedIds = Sets.newHashSet();
-            CompletedBatchOperation completed;
-            for (Future<CompletedBatchOperation> future : futures) {
-                completed = future.get();
-                success = validateBatchOperation(failed, failedIds, completed);
-            }
-
-            return finalizeBatchOperation(success, failed, failedIds);
 
         }
 
         @Override
-        public CompletedBatchOperation get(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException,
-                TimeoutException {
-
-            if (isDone()) {
-                return overall;
-            }
-            boolean success = true;
-            Set<FlowRule> failed = Sets.newHashSet();
-            Set<Long> failedIds = Sets.newHashSet();
-            CompletedBatchOperation completed;
-            for (Future<CompletedBatchOperation> future : futures) {
-                completed = future.get(timeout, unit);
-                success = validateBatchOperation(failed, failedIds, completed);
-            }
-            return finalizeBatchOperation(success, failed, failedIds);
-        }
-
-        private boolean validateBatchOperation(Set<FlowRule> failed,
-                                               Set<Long> failedIds,
-                                               CompletedBatchOperation completed) {
-
-            if (isCancelled()) {
-                throw new CancellationException();
-            }
-            if (!completed.isSuccess()) {
-                log.warn("FlowRuleBatch failed: {}", completed);
-                failed.addAll(completed.failedItems());
-                failedIds.addAll(completed.failedIds());
-                cleanUpBatch();
-                cancelAllSubBatches();
-                return false;
-            }
-            return true;
-        }
-
-        private void cancelAllSubBatches() {
-            for (Future<CompletedBatchOperation> f : futures) {
-                f.cancel(true);
+        public void run() {
+            if (stages.size() > 0) {
+                process(stages.remove(0));
+            } else if (!hasFailed.get() && context != null) {
+                context.onSuccess(fops);
             }
         }
 
-        private CompletedBatchOperation finalizeBatchOperation(boolean success,
-                                                               Set<FlowRule> failed,
-                                                               Set<Long> failedIds) {
-            synchronized (this) {
-                if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
-                    if (state.get() == BatchState.FINISHED) {
-                        return overall;
+        private void process(Set<FlowRuleOperation> ops) {
+            Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
+                    ArrayListMultimap.create();
+
+            FlowRuleBatchEntry fbe;
+            for (FlowRuleOperation flowRuleOperation : ops) {
+                switch (flowRuleOperation.type()) {
+                    // FIXME: Brian needs imagination when creating class names.
+                    case ADD:
+                        fbe = new FlowRuleBatchEntry(
+                                FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
+                        break;
+                    case MODIFY:
+                        fbe = new FlowRuleBatchEntry(
+                                FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
+                        break;
+                    case REMOVE:
+                        fbe = new FlowRuleBatchEntry(
+                                FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
+                        break;
+                    default:
+                        throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
+                }
+                pendingDevices.add(flowRuleOperation.rule().deviceId());
+                perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
+            }
+
+
+            for (DeviceId deviceId : perDeviceBatches.keySet()) {
+                Long id = idGenerator.getNewId();
+                final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
+                                               deviceId, id);
+                pendingFlowOperations.put(id, this);
+                deviceInstallers.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        store.storeBatch(b);
                     }
-                    throw new CancellationException();
-                }
-                overall = new CompletedBatchOperation(success, failed, failedIds);
-                return overall;
+                });
             }
         }
 
-        private void cleanUpBatch() {
-            log.debug("cleaning up batch");
-            // TODO convert these into a batch?
-            for (FlowRuleBatchEntry fbe : batches.values()) {
-                if (fbe.operator() == FlowRuleOperation.ADD ||
-                    fbe.operator() == FlowRuleOperation.MODIFY) {
-                    store.deleteFlowRule(fbe.target());
-                } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
-                    store.removeFlowRule(new DefaultFlowEntry(fbe.target()));
-                    store.storeFlowRule(fbe.target());
-                }
+        public void satisfy(DeviceId devId) {
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                operationsService.submit(this);
             }
         }
+
+
+
+        public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+            hasFailed.set(true);
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                operationsService.submit(this);
+            }
+
+            if (context != null) {
+                final FlowRuleOperations.Builder failedOpsBuilder =
+                    FlowRuleOperations.builder();
+                failures.stream().forEach(failedOpsBuilder::add);
+
+                context.onError(failedOpsBuilder.build());
+            }
+        }
+
     }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
index 8b6d8ad..685fa70 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentManager.java
@@ -1151,6 +1151,7 @@
          */
         protected Future<CompletedBatchOperation> applyNextBatch(List<CompletedIntentUpdate> updates) {
             //TODO test this. (also, maybe save this batch)
+
             FlowRuleBatchOperation batch = createFlowRuleBatchOperation(updates);
             if (batch.size() > 0) {
                 //FIXME apply batch might throw an exception
@@ -1165,7 +1166,7 @@
         }
 
         private FlowRuleBatchOperation createFlowRuleBatchOperation(List<CompletedIntentUpdate> intentUpdates) {
-            FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
+            FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList(), null, 0);
             for (CompletedIntentUpdate update : intentUpdates) {
                 FlowRuleBatchOperation currentBatch = update.currentBatch();
                 if (currentBatch != null) {
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
index 583d7e3..816fc12 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/LinkCollectionIntentInstaller.java
@@ -98,6 +98,7 @@
             outputPorts.put(egressPoint.deviceId(), egressPoint.port());
         }
 
+        //FIXME change to new api
         FlowRuleBatchOperation batchOperation =
                 new FlowRuleBatchOperation(outputPorts
                         .keys()
@@ -105,7 +106,7 @@
                         .map(deviceId -> createBatchEntry(operation,
                                                    intent, deviceId,
                                                    outputPorts.get(deviceId)))
-                        .collect(Collectors.toList()));
+                        .collect(Collectors.toList()), null, 0);
 
         return Collections.singletonList(batchOperation);
     }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
index e958c58..5b226a3 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/OpticalPathIntentInstaller.java
@@ -181,6 +181,7 @@
                                             true);
         rules.add(new FlowRuleBatchEntry(operation, rule));
 
-        return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+        //FIXME change to new api
+        return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
     }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
index 1c8ea7e..b90be50 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/PathIntentInstaller.java
@@ -108,7 +108,8 @@
                                              intent.id().fingerprint()));
             prev = link.dst();
         }
-        return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+        //FIXME this should change to new api.
+        return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
     }
 
     @Override
@@ -138,7 +139,8 @@
                                              intent.id().fingerprint()));
             prev = link.dst();
         }
-        return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+        // FIXME this should change to new api
+        return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
     }
 
     @Override
diff --git a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java b/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
index c3f9ba0..87fc585 100644
--- a/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
+++ b/core/net/src/test/java/org/onosproject/event/impl/TestEventDispatcher.java
@@ -31,7 +31,7 @@
 
     @Override
     @SuppressWarnings("unchecked")
-    public void post(Event event) {
+    public synchronized void post(Event event) {
         EventSink sink = getSink(event.getClass());
         checkState(sink != null, "No sink for event %s", event);
         sink.process(event);
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index 5a79b43..3fd8b5a 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -20,12 +20,15 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
-
+import com.google.common.util.concurrent.MoreExecutors;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
 import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.core.IdGenerator;
+import org.onosproject.core.Version;
 import org.onosproject.event.impl.TestEventDispatcher;
 import org.onosproject.net.DefaultDevice;
 import org.onosproject.net.Device;
@@ -36,7 +39,6 @@
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceServiceAdapter;
-import org.onosproject.net.flow.BatchOperation;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.DefaultFlowRule;
@@ -72,6 +74,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.*;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.*;
@@ -97,12 +100,16 @@
     protected TestListener listener = new TestListener();
     private ApplicationId appId;
 
+
     @Before
     public void setUp() {
         mgr = new FlowRuleManager();
         mgr.store = new SimpleFlowRuleStore();
         mgr.eventDispatcher = new TestEventDispatcher();
         mgr.deviceService = new TestDeviceService();
+        mgr.coreService = new TestCoreService();
+        mgr.operationsService = MoreExecutors.newDirectExecutorService();
+        mgr.deviceInstallers = MoreExecutors.newDirectExecutorService();
         service = mgr;
         registry = mgr;
 
@@ -246,14 +253,23 @@
 
     @Test
     public void flowRemoved() {
+
         FlowRule f1 = addFlowRule(1);
         FlowRule f2 = addFlowRule(2);
         StoredFlowEntry fe1 = new DefaultFlowEntry(f1);
         FlowEntry fe2 = new DefaultFlowEntry(f2);
+
+
         providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2));
         service.removeFlowRules(f1);
+
         fe1.setState(FlowEntryState.REMOVED);
+
+
+
         providerService.flowRemoved(fe1);
+
+
         validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED,
                        RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED);
 
@@ -263,11 +279,13 @@
         FlowRule f3 = flowRule(3, 3);
         FlowEntry fe3 = new DefaultFlowEntry(f3);
         service.applyFlowRules(f3);
+
         providerService.pushFlowMetrics(DID, Collections.singletonList(fe3));
         validateEvents(RULE_ADD_REQUESTED, RULE_ADDED);
 
         providerService.flowRemoved(fe3);
         validateEvents();
+
     }
 
     @Test
@@ -281,7 +299,6 @@
         FlowEntry fe1 = new DefaultFlowEntry(f1);
         FlowEntry fe2 = new DefaultFlowEntry(f2);
 
-
         //FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
         //FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
 
@@ -388,7 +405,7 @@
                 FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
 
         FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
-                Lists.newArrayList(fbe1, fbe2));
+                Lists.newArrayList(fbe1, fbe2), null, 0);
         Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
         assertTrue("Entries in wrong state",
                    validateState(ImmutableMap.of(
@@ -406,53 +423,6 @@
 
     }
 
-    @Test
-    public void cancelBatch() {
-        FlowRule f1 = flowRule(1, 1);
-        FlowRule f2 = flowRule(2, 2);
-
-
-        mgr.applyFlowRules(f1);
-
-        assertTrue("Entries in wrong state",
-                   validateState(ImmutableMap.of(
-                           f1, FlowEntryState.PENDING_ADD)));
-
-        FlowEntry fe1 = new DefaultFlowEntry(f1);
-        providerService.pushFlowMetrics(DID, Collections.<FlowEntry>singletonList(fe1));
-
-        assertTrue("Entries in wrong state",
-                   validateState(ImmutableMap.of(
-                           f1, FlowEntryState.ADDED)));
-
-
-        FlowRuleBatchEntry fbe1 = new FlowRuleBatchEntry(
-                FlowRuleBatchEntry.FlowRuleOperation.REMOVE, f1);
-
-        FlowRuleBatchEntry fbe2 = new FlowRuleBatchEntry(
-                FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
-
-        FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
-                Lists.newArrayList(fbe1, fbe2));
-        Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
-
-        future.cancel(true);
-
-        assertTrue(flowCount() == 2);
-
-        /*
-         * Rule f1 should be re-added to the list and therefore be in a pending add
-         * state.
-         */
-        assertTrue("Entries in wrong state",
-                   validateState(ImmutableMap.of(
-                           f2, FlowEntryState.PENDING_REMOVE,
-                           f1, FlowEntryState.PENDING_ADD)));
-
-
-    }
-
-
     private static class TestListener implements FlowRuleListener {
         final List<FlowRuleEvent> events = new ArrayList<>();
 
@@ -528,9 +498,8 @@
         }
 
         @Override
-        public ListenableFuture<CompletedBatchOperation> executeBatch(
-                BatchOperation<FlowRuleBatchEntry> batch) {
-            return new TestInstallationFuture();
+        public void executeBatch(FlowRuleBatchOperation batch) {
+         // TODO: need to call batchOperationComplete
         }
 
         private class TestInstallationFuture
@@ -554,14 +523,14 @@
             @Override
             public CompletedBatchOperation get()
                     throws InterruptedException, ExecutionException {
-                return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
+                return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
             }
 
             @Override
             public CompletedBatchOperation get(long timeout, TimeUnit unit)
                     throws InterruptedException,
                     ExecutionException, TimeoutException {
-                return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
+                return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
             }
 
             @Override
@@ -644,4 +613,37 @@
         }
     }
 
+    private class TestCoreService implements CoreService {
+        @Override
+        public Version version() {
+            return null;
+        }
+
+        @Override
+        public Set<ApplicationId> getAppIds() {
+            return null;
+        }
+
+        @Override
+        public ApplicationId getAppId(Short id) {
+            return null;
+        }
+
+        @Override
+        public ApplicationId registerApplication(String identifier) {
+            return null;
+        }
+
+        @Override
+        public IdGenerator getIdGenerator(String topic) {
+            return new IdGenerator() {
+                private AtomicLong counter = new AtomicLong(0);
+                @Override
+                public long getNewId() {
+                    return counter.getAndIncrement();
+                }
+            };
+        }
+    }
+
 }
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
index 17a9498..3570c77 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/IntentManagerTest.java
@@ -201,7 +201,7 @@
             FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
             List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
             rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr));
-            return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+            return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
         }
 
         @Override
@@ -209,7 +209,7 @@
             FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
             List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
             rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
-            return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+            return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
         }
 
         @Override
@@ -219,7 +219,7 @@
             List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
             rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
             rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr2));
-            return Lists.newArrayList(new FlowRuleBatchOperation(rules));
+            return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
         }
     }
 
diff --git a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
index e956575..72de4f0 100644
--- a/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
+++ b/core/net/src/test/java/org/onosproject/net/intent/impl/MockFlowRuleService.java
@@ -27,6 +27,7 @@
 import org.onosproject.net.flow.FlowRuleBatchEntry;
 import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleService;
 
 import com.google.common.collect.ImmutableSet;
@@ -45,11 +46,11 @@
 
     public void setFuture(boolean success, long intentId) {
         if (success) {
-            future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet()));
+            future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet(), null));
         } else {
             final Set<Long> failedIds = ImmutableSet.of(intentId);
             future = Futures.immediateFuture(
-                    new CompletedBatchOperation(false, flows, failedIds));
+                    new CompletedBatchOperation(false, flows, failedIds, null));
         }
     }
 
@@ -74,6 +75,11 @@
     }
 
     @Override
+    public void apply(FlowRuleOperations ops) {
+
+    }
+
+    @Override
     public int getFlowRuleCount() {
         return flows.size();
     }