Implementation of new Flow Subsystem:

The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.

Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9

more flowservice improvements

Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f

more flowservice impl

Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f

Manager to store functional (at least i believe it)

Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53

flow subsystem functional: need to fix unit tests

Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0

flow subsystem functional

Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d

finished refactor of flow subsystem

Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804

fix for null flow provider to use new api

Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
diff --git a/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java b/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java
index e882c76..0686349 100644
--- a/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java
+++ b/providers/null/flow/src/main/java/org/onosproject/provider/nil/flow/impl/NullFlowRuleProvider.java
@@ -15,9 +15,7 @@
  */
 package org.onosproject.provider.nil.flow.impl;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -29,12 +27,12 @@
 import org.onlab.util.Timer;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.flow.BatchOperation;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.flow.FlowRuleProvider;
 import org.onosproject.net.flow.FlowRuleProviderRegistry;
 import org.onosproject.net.flow.FlowRuleProviderService;
@@ -43,7 +41,9 @@
 import org.slf4j.Logger;
 
 import java.util.Collections;
-import java.util.concurrent.Future;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import static org.slf4j.LoggerFactory.getLogger;
@@ -59,7 +59,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleProviderRegistry providerRegistry;
 
-    private Multimap<DeviceId, FlowEntry> flowTable = HashMultimap.create();
+    private ConcurrentMap<DeviceId, Set<FlowEntry>> flowTable = new ConcurrentHashMap<>();
 
     private FlowRuleProviderService providerService;
 
@@ -88,18 +88,10 @@
     }
 
     @Override
-    public void applyFlowRule(FlowRule... flowRules) {
-        for (int i = 0; i < flowRules.length; i++) {
-            flowTable.put(flowRules[i].deviceId(), new DefaultFlowEntry(flowRules[i]));
-        }
-    }
+    public void applyFlowRule(FlowRule... flowRules) {}
 
     @Override
-    public void removeFlowRule(FlowRule... flowRules) {
-        for (int i = 0; i < flowRules.length; i++) {
-            flowTable.remove(flowRules[i].deviceId(), flowRules[i]);
-        }
-    }
+    public void removeFlowRule(FlowRule... flowRules) {}
 
     @Override
     public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
@@ -107,26 +99,32 @@
     }
 
     @Override
-    public Future<CompletedBatchOperation> executeBatch(
-            BatchOperation<FlowRuleBatchEntry> batch) {
+    public void executeBatch(
+            FlowRuleBatchOperation batch) {
+        Set<FlowEntry> flowRules = flowTable.getOrDefault(batch.deviceId(), Sets.newConcurrentHashSet());
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             switch (fbe.operator()) {
                 case ADD:
-                    applyFlowRule(fbe.target());
+                    flowRules.add(new DefaultFlowEntry(fbe.target()));
                     break;
                 case REMOVE:
-                    removeFlowRule(fbe.target());
+                    flowRules.remove(new DefaultFlowEntry(fbe.target()));
                     break;
                 case MODIFY:
-                    removeFlowRule(fbe.target());
-                    applyFlowRule(fbe.target());
+                    FlowEntry entry = new DefaultFlowEntry(fbe.target());
+                    flowRules.remove(entry);
+                    flowRules.add(entry);
                     break;
                 default:
                     log.error("Unknown flow operation: {}", fbe);
             }
         }
-        return Futures.immediateFuture(
-                new CompletedBatchOperation(true, Collections.emptySet()));
+        flowTable.put(batch.deviceId(), flowRules);
+        providerService.batchOperationCompleted(batch.id(),
+                                                new CompletedBatchOperation(
+                                                        true,
+                                                        Collections.emptySet(),
+                                                        batch.deviceId()));
     }
 
     private class StatisticTask implements TimerTask {
@@ -134,10 +132,11 @@
         @Override
         public void run(Timeout to) throws Exception {
             for (DeviceId devId : flowTable.keySet()) {
-                providerService.pushFlowMetrics(devId, flowTable.get(devId));
+                    providerService.pushFlowMetrics(devId,
+                                                    flowTable.getOrDefault(devId, Collections.emptySet()));
             }
-
             timeout = timer.newTimeout(to.getTask(), 5, TimeUnit.SECONDS);
+
         }
     }
 }