Implementation of new Flow Subsystem:

The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.

Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9

more flowservice improvements

Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f

more flowservice impl

Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f

Manager to store functional (at least i believe it)

Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53

flow subsystem functional: need to fix unit tests

Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0

flow subsystem functional

Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d

finished refactor of flow subsystem

Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804

fix for null flow provider to use new api

Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java b/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
index a17f7a3..fce7ee9 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/CompletedBatchOperation.java
@@ -21,6 +21,7 @@
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
+import org.onosproject.net.DeviceId;
 
 /**
  * Representation of a completed flow rule batch operation.
@@ -30,19 +31,22 @@
     private final boolean success;
     private final Set<FlowRule> failures;
     private final Set<Long> failedIds;
+    private final DeviceId deviceId;
 
     /**
      * Creates a new batch completion result.
      *
-     * @param success  indicates whether the completion is successful.
+     * @param success  indicates whether the completion is successful
      * @param failures set of any failures encountered
      * @param failedIds (optional) set of failed operation ids
+     * @param deviceId the device this operation completed for
      */
     public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
-                                   Set<Long> failedIds) {
+                                   Set<Long> failedIds, DeviceId deviceId) {
         this.success = success;
         this.failures = ImmutableSet.copyOf(failures);
         this.failedIds = ImmutableSet.copyOf(failedIds);
+        this.deviceId = deviceId;
     }
 
     /**
@@ -51,10 +55,12 @@
      * @param success  indicates whether the completion is successful.
      * @param failures set of any failures encountered
      */
-    public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
+    public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
+                                   DeviceId deviceId) {
         this.success = success;
         this.failures = ImmutableSet.copyOf(failures);
         this.failedIds = Collections.emptySet();
+        this.deviceId = deviceId;
     }
 
 
@@ -73,12 +79,17 @@
         return failedIds;
     }
 
+    public DeviceId deviceId() {
+        return this.deviceId;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                 .add("success?", success)
                 .add("failedItems", failures)
                 .add("failedIds", failedIds)
+                .add("deviceId", deviceId)
                 .toString();
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
index ab72d8b..9bcace2 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchEvent.java
@@ -16,12 +16,14 @@
 package org.onosproject.net.flow;
 
 import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
 
 /**
  * Describes flow rule batch event.
  */
 public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.Type, FlowRuleBatchRequest> {
 
+
     /**
      * Type of flow rule events.
      */
@@ -42,14 +44,17 @@
     }
 
     private final CompletedBatchOperation result;
+    private final DeviceId deviceId;
 
     /**
      * Constructs a new FlowRuleBatchEvent.
-     * @param request batch operation request.
+     *
+     * @param request batch operation request
+     * @param deviceId the device this batch will be processed on
      * @return event.
      */
-    public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request) {
-        FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
+    public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request, DeviceId deviceId) {
+        FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, deviceId);
         return event;
     }
 
@@ -73,13 +78,36 @@
     }
 
     /**
+     * Returns the deviceId for this batch.
+     * @return device id
+     */
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    /**
      * Creates an event of a given type and for the specified flow rule batch.
      *
      * @param type    flow rule batch event type
-     * @param batch    event flow rule batch subject
+     * @param request event flow rule batch subject
+     * @param result  the result of the batch operation
      */
     private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, CompletedBatchOperation result) {
         super(type, request);
         this.result = result;
+        this.deviceId = result.deviceId();
+    }
+
+    /**
+     * Creates an event of a given type and for the specified flow rule batch.
+     *
+     * @param type      flow rule batch event type
+     * @param request   event flow rule batch subject
+     * @param deviceId  the device id for this batch
+     */
+    private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, DeviceId deviceId) {
+        super(type, request);
+        this.result = null;
+        this.deviceId = deviceId;
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
index f9bc632..e13acd7 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchOperation.java
@@ -15,12 +15,37 @@
  */
 package org.onosproject.net.flow;
 
+import org.onosproject.net.DeviceId;
+
 import java.util.Collection;
 
+/**
+ * Class used with the flow subsystem to process per device
+ * batches.
+ */
 public class FlowRuleBatchOperation
     extends BatchOperation<FlowRuleBatchEntry> {
 
-    public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
+    /**
+     * This id is used to cary to id of the original
+     * FlowOperations and track where this batch operation
+     * came from. The id is unique cluster wide.
+     */
+    private final long id;
+    private final DeviceId deviceId;
+
+    public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations,
+                                  DeviceId deviceId, long flowOperationId) {
         super(operations);
+        this.id = flowOperationId;
+        this.deviceId = deviceId;
+    }
+
+    public DeviceId deviceId() {
+        return this.deviceId;
+    }
+
+    public long id() {
+        return id;
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
index 35db171..5570a40 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleBatchRequest.java
@@ -15,59 +15,43 @@
  */
 package org.onosproject.net.flow;
 
+import com.google.common.collect.Lists;
+import org.onosproject.net.DeviceId;
+
 import java.util.Collections;
 import java.util.List;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-
-
-
-import com.google.common.collect.Lists;
+import java.util.Set;
 
 public class FlowRuleBatchRequest {
 
-    private final int batchId;
-    private final List<FlowRuleBatchEntry> toAdd;
-    private final List<FlowRuleBatchEntry> toRemove;
+    /**
+     * This id is used to cary to id of the original
+     * FlowOperations and track where this batch operation
+     * came from. The id is unique cluster wide.
+     */
+    private final long batchId;
 
-    public FlowRuleBatchRequest(int batchId, List<FlowRuleBatchEntry> toAdd,
-                                List<FlowRuleBatchEntry> toRemove) {
+    private final Set<FlowRuleBatchEntry> ops;
+
+
+    public FlowRuleBatchRequest(long batchId, Set<FlowRuleBatchEntry> ops) {
         this.batchId = batchId;
-        this.toAdd = Collections.unmodifiableList(toAdd);
-        this.toRemove = Collections.unmodifiableList(toRemove);
+        this.ops = Collections.unmodifiableSet(ops);
+
+
     }
 
-    public List<FlowRule> toAdd() {
-        return FluentIterable.from(toAdd).transform(
-                new Function<FlowRuleBatchEntry, FlowRule>() {
-
-            @Override
-            public FlowRule apply(FlowRuleBatchEntry input) {
-                return input.target();
-            }
-        }).toList();
+    public Set<FlowRuleBatchEntry> ops() {
+        return ops;
     }
 
-    public List<FlowRule> toRemove() {
-        return FluentIterable.from(toRemove).transform(
-                new Function<FlowRuleBatchEntry, FlowRule>() {
-
-                    @Override
-                    public FlowRule apply(FlowRuleBatchEntry input) {
-                        return input.target();
-                    }
-                }).toList();
-    }
-
-    public FlowRuleBatchOperation asBatchOperation() {
+    public FlowRuleBatchOperation asBatchOperation(DeviceId deviceId) {
         List<FlowRuleBatchEntry> entries = Lists.newArrayList();
-        entries.addAll(toAdd);
-        entries.addAll(toRemove);
-        return new FlowRuleBatchOperation(entries);
+        entries.addAll(ops);
+        return new FlowRuleBatchOperation(entries, deviceId, batchId);
     }
 
-    public int batchId() {
+    public long batchId() {
         return batchId;
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java
new file mode 100644
index 0000000..82d43be
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperation.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Representation of an operation on a flow rule table.
+ */
+public class FlowRuleOperation {
+
+    /**
+     * Type of flow table operations.
+     */
+    public enum Type {
+        ADD,
+        MODIFY,
+        REMOVE
+    }
+
+    private final FlowRule rule;
+    private final Type type;
+
+    public FlowRuleOperation(FlowRule rule, Type type) {
+        this.rule = rule;
+        this.type = type;
+    }
+
+    /**
+     * Returns the type of operation.
+     *
+     * @return type
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the flow rule.
+     *
+     * @return flow rule
+     */
+    public FlowRule rule() {
+        return rule;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("rule", rule)
+                .add("type", type)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
new file mode 100644
index 0000000..498cc05
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.onosproject.net.flow.FlowRuleOperation.Type.*;
+
+/**
+ * A batch of flow rule operations that are broken into stages.
+ * TODO move this up to parent's package
+ */
+public class FlowRuleOperations {
+
+    private final List<Set<FlowRuleOperation>> stages;
+    private final FlowRuleOperationsContext callback; // TODO consider Optional
+
+    private FlowRuleOperations(List<Set<FlowRuleOperation>> stages,
+                               FlowRuleOperationsContext cb) {
+        this.stages = stages;
+        this.callback = cb;
+    }
+
+    // kryo-constructor
+    protected FlowRuleOperations() {
+        this.stages = Lists.newArrayList();
+        this.callback = null;
+    }
+
+    /**
+     * Returns the flow rule operations as sets of stages that should be
+     * executed sequentially.
+     *
+     * @return flow rule stages
+     */
+    public List<Set<FlowRuleOperation>> stages() {
+        return stages;
+    }
+
+    /**
+     * Returns the callback for this batch of operations.
+     *
+     * @return callback
+     */
+    public FlowRuleOperationsContext callback() {
+        return callback;
+    }
+
+    /**
+     * Returns a new builder.
+     *
+     * @return new builder
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("stages", stages)
+                .toString();
+    }
+
+    /**
+     * A builder for constructing flow rule operations.
+     */
+    public static final class Builder {
+
+        private final ImmutableList.Builder<Set<FlowRuleOperation>> listBuilder = ImmutableList.builder();
+        private ImmutableSet.Builder<FlowRuleOperation> currentStage = ImmutableSet.builder();
+
+        // prevent use of the default constructor outside of this file; use the above method
+        private Builder() {}
+
+        /**
+         * Appends a flow rule add to the current stage.
+         *
+         * @param flowRule flow rule
+         * @return this
+         */
+        public Builder add(FlowRule flowRule) {
+            currentStage.add(new FlowRuleOperation(flowRule, ADD));
+            return this;
+        }
+
+        /**
+         * Appends a flow rule modify to the current stage.
+         *
+         * @param flowRule flow rule
+         * @return this
+         */
+        public Builder modify(FlowRule flowRule) {
+            currentStage.add(new FlowRuleOperation(flowRule, MODIFY));
+            return this;
+        }
+
+        /**
+         * Appends a flow rule remove to the current stage.
+         *
+         * @param flowRule flow rule
+         * @return this
+         */
+        // FIXME this is confusing, consider renaming
+        public Builder remove(FlowRule flowRule) {
+            currentStage.add(new FlowRuleOperation(flowRule, REMOVE));
+            return this;
+        }
+
+        /**
+         * Closes the current stage.
+         */
+        private void closeStage() {
+            ImmutableSet<FlowRuleOperation> stage = currentStage.build();
+            if (!stage.isEmpty()) {
+                listBuilder.add(stage);
+            }
+        }
+
+        /**
+         * Closes the current stage and starts a new one.
+         *
+         * @return this
+         */
+        public Builder newStage() {
+            closeStage();
+            currentStage = ImmutableSet.builder();
+            return this;
+        }
+
+        /**
+         * Builds the immutable flow rule operations.
+         *
+         * @return flow rule operations
+         */
+        public FlowRuleOperations build() {
+            return build(null);
+        }
+
+        /**
+         * Builds the immutable flow rule operations.
+         *
+         * @param cb the callback to call when this operation completes
+         * @return flow rule operations
+         */
+        public FlowRuleOperations build(FlowRuleOperationsContext cb) {
+            closeStage();
+            return new FlowRuleOperations(listBuilder.build(), cb);
+        }
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java
new file mode 100644
index 0000000..c405b12
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperationsContext.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.net.flow;
+
+/**
+ * The context of a flow rule operations that will become the subject of
+ * the notification.
+ *
+ * Implementations of this class must be serializable.
+ */
+public interface FlowRuleOperationsContext {
+    // TODO we might also want to execute a method on behalf of the app
+    default void onSuccess(FlowRuleOperations ops){}
+    default void onError(FlowRuleOperations ops){}
+}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
index aae6902..066ac68 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProvider.java
@@ -18,8 +18,6 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.provider.Provider;
 
-import java.util.concurrent.Future;
-
 /**
  * Abstraction of a flow rule provider.
  */
@@ -56,8 +54,7 @@
      * Installs a batch of flow rules. Each flowrule is associated to an
      * operation which results in either addition, removal or modification.
      * @param batch a batch of flow rules
-     * @return a future indicating the status of this execution
      */
-    Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
+    void executeBatch(FlowRuleBatchOperation batch);
 
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
index 9456719..4c9ebcb 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
@@ -40,4 +40,13 @@
      */
     void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
 
+    /**
+     * Indicates to the core that the requested batch operation has
+     * been completed.
+     *
+     * @param batchId the batch which was processed
+     * @param operation the resulting outcome of the operation
+     */
+    void batchOperationCompleted(long batchId, CompletedBatchOperation operation);
+
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
index 78772c2..957ecf2 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
@@ -15,11 +15,11 @@
  */
 package org.onosproject.net.flow;
 
-import java.util.concurrent.Future;
-
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 
+import java.util.concurrent.Future;
+
 /**
  * Service for injecting flow rules into the environment and for obtaining
  * information about flow rules already in the environment. This implements
@@ -30,6 +30,11 @@
 public interface FlowRuleService {
 
     /**
+     * The topic used for obtaining globally unique ids.
+     */
+    static String FLOW_OP_TOPIC = "flow-ops-ids";
+
+    /**
      * Returns the number of flow rules in the system.
      *
      * @return flow rule count
@@ -96,11 +101,20 @@
      * Applies a batch operation of FlowRules.
      *
      * @param batch batch operation to apply
-     * @return future indicating the state of the batch operation
+     * @return future indicating the state of the batch operation, due to the
+     * deprecation of this api the future will immediately return
      */
+    @Deprecated
     Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
 
     /**
+     * Applies a batch operation of FlowRules.
+     *
+     * @param ops batch operation to apply
+     */
+    void apply(FlowRuleOperations ops);
+
+    /**
      * Adds the specified flow rule listener.
      *
      * @param listener flow rule listener
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
index 6fca2c3..d3aebba 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
@@ -15,8 +15,6 @@
  */
 package org.onosproject.net.flow;
 
-import java.util.concurrent.Future;
-
 import org.onosproject.net.DeviceId;
 import org.onosproject.store.Store;
 
@@ -54,6 +52,7 @@
      *
      * @param rule the flow rule to add
      */
+    @Deprecated
     void storeFlowRule(FlowRule rule);
 
     /**
@@ -61,10 +60,9 @@
      *
      * @param batchOperation batch of flow rules.
      *           A batch can contain flow rules for a single device only.
-     * @return Future response indicating success/failure of the batch operation
-     * all the way down to the device.
+     *
      */
-    Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation batchOperation);
+    void storeBatch(FlowRuleBatchOperation batchOperation);
 
     /**
      * Invoked on the completion of a storeBatch operation.