Implementation of new Flow Subsystem:

The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.

Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9

more flowservice improvements

Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f

more flowservice impl

Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f

Manager to store functional (at least i believe it)

Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53

flow subsystem functional: need to fix unit tests

Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0

flow subsystem functional

Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d

finished refactor of flow subsystem

Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804

fix for null flow provider to use new api

Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 6d6cd9a..158764f 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -21,7 +21,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-
+import com.google.common.util.concurrent.Futures;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -29,22 +29,25 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
 import org.onosproject.event.AbstractListenerRegistry;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
 import org.onosproject.net.flow.FlowRuleBatchEvent;
 import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.FlowRuleBatchRequest;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperation;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
 import org.onosproject.net.flow.FlowRuleProvider;
 import org.onosproject.net.flow.FlowRuleProviderRegistry;
 import org.onosproject.net.flow.FlowRuleProviderService;
@@ -55,18 +58,16 @@
 import org.onosproject.net.provider.AbstractProviderService;
 import org.slf4j.Logger;
 
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -90,7 +91,16 @@
 
     private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
 
-    private ExecutorService futureService;
+    protected ExecutorService deviceInstallers =
+            Executors.newCachedThreadPool(namedThreads("onos-device-installer-%d"));
+
+    protected ExecutorService operationsService =
+            Executors.newFixedThreadPool(32, namedThreads("onos-flowservice-operations-%d"));
+
+    private IdGenerator idGenerator;
+
+    private Map<Long, FlowOperationsProcessor> pendingFlowOperations = new
+            ConcurrentHashMap<>();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleStore store;
@@ -101,10 +111,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
     @Activate
     public void activate() {
-        futureService =
-                Executors.newFixedThreadPool(32, namedThreads("onos-provider-future-listeners-%d"));
+
+        idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+
+
         store.setDelegate(delegate);
         eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
         log.info("Started");
@@ -112,8 +127,8 @@
 
     @Deactivate
     public void deactivate() {
-        futureService.shutdownNow();
-
+        deviceInstallers.shutdownNow();
+        operationsService.shutdownNow();
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(FlowRuleEvent.class);
         log.info("Stopped");
@@ -131,20 +146,20 @@
 
     @Override
     public void applyFlowRules(FlowRule... flowRules) {
-        Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
         for (int i = 0; i < flowRules.length; i++) {
-            toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
+            builder.add(flowRules[i]);
         }
-        applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
+        apply(builder.build());
     }
 
     @Override
     public void removeFlowRules(FlowRule... flowRules) {
-        Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
         for (int i = 0; i < flowRules.length; i++) {
-            toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
+            builder.remove(flowRules[i]);
         }
-        applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
+        apply(builder.build());
     }
 
     @Override
@@ -180,23 +195,38 @@
     }
 
     @Override
-    public Future<CompletedBatchOperation> applyBatch(
-            FlowRuleBatchOperation batch) {
-        Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
-                ArrayListMultimap.create();
-        List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
-        for (FlowRuleBatchEntry fbe : batch.getOperations()) {
-            final FlowRule f = fbe.target();
-            perDeviceBatches.put(f.deviceId(), fbe);
-        }
+    public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
 
-        for (DeviceId deviceId : perDeviceBatches.keySet()) {
-            FlowRuleBatchOperation b =
-                    new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
-            Future<CompletedBatchOperation> future = store.storeBatch(b);
-            futures.add(future);
-        }
-        return new FlowRuleBatchFuture(futures, perDeviceBatches);
+
+        FlowRuleOperations.Builder fopsBuilder = FlowRuleOperations.builder();
+        batch.getOperations().stream().forEach(op -> {
+                        switch (op.getOperator()) {
+                            case ADD:
+                                fopsBuilder.add(op.getTarget());
+                                break;
+                            case REMOVE:
+                                fopsBuilder.remove(op.getTarget());
+                                break;
+                            case MODIFY:
+                                fopsBuilder.modify(op.getTarget());
+                                break;
+                            default:
+                                log.warn("Unknown flow operation operator: {}", op.getOperator());
+
+                        }
+                }
+        );
+
+        apply(fopsBuilder.build());
+        return Futures.immediateFuture(
+                new CompletedBatchOperation(true,
+                                            Collections.emptySet(), null));
+
+    }
+
+    @Override
+    public void apply(FlowRuleOperations ops) {
+        operationsService.submit(new FlowOperationsProcessor(ops));
     }
 
     @Override
@@ -373,13 +403,19 @@
 
             }
         }
+
+        @Override
+        public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
+            store.batchOperationComplete(FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(batchId, Collections.emptySet()),
+                    operation
+            ));
+        }
     }
 
     // Store delegate to re-post events emitted from the store.
     private class InternalStoreDelegate implements FlowRuleStoreDelegate {
 
-        // FIXME set appropriate default and make it configurable
-        private static final int TIMEOUT_PER_OP = 500; // ms
 
         // TODO: Right now we only dispatch events at individual flowEntry level.
         // It may be more efficient for also dispatch events as a batch.
@@ -389,47 +425,55 @@
             switch (event.type()) {
             case BATCH_OPERATION_REQUESTED:
                 // Request has been forwarded to MASTER Node, and was
-                for (FlowRule entry : request.toAdd()) {
-                    eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
-                }
-                for (FlowRule entry : request.toRemove()) {
-                    eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
-                }
-                // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
+                request.ops().stream().forEach(
+                        op -> {
+                            switch (op.getOperator()) {
 
-                FlowRuleBatchOperation batchOperation = request.asBatchOperation();
+                                case ADD:
+                                    eventDispatcher.post(
+                                            new FlowRuleEvent(
+                                                    FlowRuleEvent.Type.RULE_ADD_REQUESTED,
+                                                    op.getTarget()));
+                                    break;
+                                case REMOVE:
+                                    eventDispatcher.post(
+                                            new FlowRuleEvent(
+                                                    FlowRuleEvent.Type.RULE_REMOVE_REQUESTED,
+                                                    op.getTarget()));
+                                    break;
+                                case MODIFY:
+                                    //TODO: do something here when the time comes.
+                                    break;
+                                default:
+                                    log.warn("Unknown flow operation operator: {}", op.getOperator());
+                            }
+                        }
+                );
+
+                DeviceId deviceId = event.deviceId();
+
+                FlowRuleBatchOperation batchOperation =
+                        request.asBatchOperation(deviceId);
 
                 FlowRuleProvider flowRuleProvider =
-                        getProvider(batchOperation.getOperations().get(0).target().deviceId());
-                final Future<CompletedBatchOperation> result =
-                        flowRuleProvider.executeBatch(batchOperation);
-                futureService.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        CompletedBatchOperation res;
-                        try {
-                            res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
-                            store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
-                        } catch (TimeoutException | InterruptedException | ExecutionException e) {
-                            log.warn("Something went wrong with the batch operation {}",
-                                     request.batchId(), e);
+                        getProvider(deviceId);
 
-                            Set<FlowRule> failures = new HashSet<>(batchOperation.size());
-                            for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
-                                failures.add(op.target());
-                            }
-                            res = new CompletedBatchOperation(false, failures);
-                            store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
-                        }
-                    }
-                });
+                flowRuleProvider.executeBatch(batchOperation);
+
                 break;
 
             case BATCH_OPERATION_COMPLETED:
-                // MASTER Node has pushed the batch down to the Device
 
-                // Note: RULE_ADDED will be posted
-                // when Flow was actually confirmed by stats reply.
+                FlowOperationsProcessor fops = pendingFlowOperations.remove(
+                        event.subject().batchId());
+                if (event.result().isSuccess()) {
+                    if (fops != null) {
+                        fops.satisfy(event.deviceId());
+                    }
+                } else {
+                    fops.fail(event.deviceId(), event.result().failedItems());
+                }
+
                 break;
 
             default:
@@ -438,141 +482,100 @@
         }
     }
 
-    private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
+    private class FlowOperationsProcessor implements Runnable {
 
-        private final List<Future<CompletedBatchOperation>> futures;
-        private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
-        private final AtomicReference<BatchState> state;
-        private CompletedBatchOperation overall;
+        private final List<Set<FlowRuleOperation>> stages;
+        private final FlowRuleOperationsContext context;
+        private final FlowRuleOperations fops;
+        private final AtomicBoolean hasFailed = new AtomicBoolean(false);
 
-        public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
-                Multimap<DeviceId, FlowRuleBatchEntry> batches) {
-            this.futures = futures;
-            this.batches = batches;
-            this.state = new AtomicReference<>(BatchState.STARTED);
-        }
+        private Set<DeviceId> pendingDevices;
 
-        @Override
-        public boolean cancel(boolean mayInterruptIfRunning) {
-            if (state.get() == BatchState.FINISHED) {
-                return false;
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Cancelling FlowRuleBatchFuture",
-                          new RuntimeException("Just printing backtrace"));
-            }
-            if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
-                return false;
-            }
-            cleanUpBatch();
-            for (Future<CompletedBatchOperation> f : futures) {
-                f.cancel(mayInterruptIfRunning);
-            }
-            return true;
-        }
+        public FlowOperationsProcessor(FlowRuleOperations ops) {
 
-        @Override
-        public boolean isCancelled() {
-            return state.get() == BatchState.CANCELLED;
-        }
+            this.stages = Lists.newArrayList(ops.stages());
+            this.context = ops.callback();
+            this.fops = ops;
+            pendingDevices = Sets.newConcurrentHashSet();
 
-        @Override
-        public boolean isDone() {
-            return state.get() == BatchState.FINISHED;
-        }
-
-
-        @Override
-        public CompletedBatchOperation get() throws InterruptedException,
-            ExecutionException {
-
-            if (isDone()) {
-                return overall;
-            }
-
-            boolean success = true;
-            Set<FlowRule> failed = Sets.newHashSet();
-            Set<Long> failedIds = Sets.newHashSet();
-            CompletedBatchOperation completed;
-            for (Future<CompletedBatchOperation> future : futures) {
-                completed = future.get();
-                success = validateBatchOperation(failed, failedIds, completed);
-            }
-
-            return finalizeBatchOperation(success, failed, failedIds);
 
         }
 
         @Override
-        public CompletedBatchOperation get(long timeout, TimeUnit unit)
-                throws InterruptedException, ExecutionException,
-                TimeoutException {
-
-            if (isDone()) {
-                return overall;
-            }
-            boolean success = true;
-            Set<FlowRule> failed = Sets.newHashSet();
-            Set<Long> failedIds = Sets.newHashSet();
-            CompletedBatchOperation completed;
-            for (Future<CompletedBatchOperation> future : futures) {
-                completed = future.get(timeout, unit);
-                success = validateBatchOperation(failed, failedIds, completed);
-            }
-            return finalizeBatchOperation(success, failed, failedIds);
-        }
-
-        private boolean validateBatchOperation(Set<FlowRule> failed,
-                                               Set<Long> failedIds,
-                                               CompletedBatchOperation completed) {
-
-            if (isCancelled()) {
-                throw new CancellationException();
-            }
-            if (!completed.isSuccess()) {
-                log.warn("FlowRuleBatch failed: {}", completed);
-                failed.addAll(completed.failedItems());
-                failedIds.addAll(completed.failedIds());
-                cleanUpBatch();
-                cancelAllSubBatches();
-                return false;
-            }
-            return true;
-        }
-
-        private void cancelAllSubBatches() {
-            for (Future<CompletedBatchOperation> f : futures) {
-                f.cancel(true);
+        public void run() {
+            if (stages.size() > 0) {
+                process(stages.remove(0));
+            } else if (!hasFailed.get() && context != null) {
+                context.onSuccess(fops);
             }
         }
 
-        private CompletedBatchOperation finalizeBatchOperation(boolean success,
-                                                               Set<FlowRule> failed,
-                                                               Set<Long> failedIds) {
-            synchronized (this) {
-                if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
-                    if (state.get() == BatchState.FINISHED) {
-                        return overall;
+        private void process(Set<FlowRuleOperation> ops) {
+            Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
+                    ArrayListMultimap.create();
+
+            FlowRuleBatchEntry fbe;
+            for (FlowRuleOperation flowRuleOperation : ops) {
+                switch (flowRuleOperation.type()) {
+                    // FIXME: Brian needs imagination when creating class names.
+                    case ADD:
+                        fbe = new FlowRuleBatchEntry(
+                                FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
+                        break;
+                    case MODIFY:
+                        fbe = new FlowRuleBatchEntry(
+                                FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
+                        break;
+                    case REMOVE:
+                        fbe = new FlowRuleBatchEntry(
+                                FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
+                        break;
+                    default:
+                        throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
+                }
+                pendingDevices.add(flowRuleOperation.rule().deviceId());
+                perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
+            }
+
+
+            for (DeviceId deviceId : perDeviceBatches.keySet()) {
+                Long id = idGenerator.getNewId();
+                final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
+                                               deviceId, id);
+                pendingFlowOperations.put(id, this);
+                deviceInstallers.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        store.storeBatch(b);
                     }
-                    throw new CancellationException();
-                }
-                overall = new CompletedBatchOperation(success, failed, failedIds);
-                return overall;
+                });
             }
         }
 
-        private void cleanUpBatch() {
-            log.debug("cleaning up batch");
-            // TODO convert these into a batch?
-            for (FlowRuleBatchEntry fbe : batches.values()) {
-                if (fbe.operator() == FlowRuleOperation.ADD ||
-                    fbe.operator() == FlowRuleOperation.MODIFY) {
-                    store.deleteFlowRule(fbe.target());
-                } else if (fbe.operator() == FlowRuleOperation.REMOVE) {
-                    store.removeFlowRule(new DefaultFlowEntry(fbe.target()));
-                    store.storeFlowRule(fbe.target());
-                }
+        public void satisfy(DeviceId devId) {
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                operationsService.submit(this);
             }
         }
+
+
+
+        public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+            hasFailed.set(true);
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                operationsService.submit(this);
+            }
+
+            if (context != null) {
+                final FlowRuleOperations.Builder failedOpsBuilder =
+                    FlowRuleOperations.builder();
+                failures.stream().forEach(failedOpsBuilder::add);
+
+                context.onError(failedOpsBuilder.build());
+            }
+        }
+
     }
 }