Initial DistributedDlowRuleStore

- known bug: responding to ClusterMessage not possible.

Change-Id: Iaa4245c64d2a6219d7c48ed30ddca7d558dbc177
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 9ea99c3..525946e 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -104,24 +104,52 @@
     public void applyFlowRules(FlowRule... flowRules) {
         for (int i = 0; i < flowRules.length; i++) {
             FlowRule f = flowRules[i];
-            final Device device = deviceService.getDevice(f.deviceId());
-            final FlowRuleProvider frp = getProvider(device.providerId());
-            store.storeFlowRule(f);
-            frp.applyFlowRule(f);
+            boolean local = store.storeFlowRule(f);
+            if (local) {
+                // TODO: aggregate all local rules and push down once?
+                applyFlowRulesToProviders(f);
+            }
+        }
+    }
+
+    private void applyFlowRulesToProviders(FlowRule... flowRules) {
+        DeviceId did = null;
+        FlowRuleProvider frp = null;
+        for (FlowRule f : flowRules) {
+            if (!f.deviceId().equals(did)) {
+                did = f.deviceId();
+                final Device device = deviceService.getDevice(did);
+                frp = getProvider(device.providerId());
+            }
+            if (frp != null) {
+                frp.applyFlowRule(f);
+            }
         }
     }
 
     @Override
     public void removeFlowRules(FlowRule... flowRules) {
         FlowRule f;
-        FlowRuleProvider frp;
-        Device device;
         for (int i = 0; i < flowRules.length; i++) {
             f = flowRules[i];
-            device = deviceService.getDevice(f.deviceId());
-            store.deleteFlowRule(f);
-            if (device != null) {
+            boolean local = store.deleteFlowRule(f);
+            if (local) {
+                // TODO: aggregate all local rules and push down once?
+                removeFlowRulesFromProviders(f);
+            }
+        }
+    }
+
+    private void removeFlowRulesFromProviders(FlowRule... flowRules) {
+        DeviceId did = null;
+        FlowRuleProvider frp = null;
+        for (FlowRule f : flowRules) {
+            if (!f.deviceId().equals(did)) {
+                did = f.deviceId();
+                final Device device = deviceService.getDevice(did);
                 frp = getProvider(device.providerId());
+            }
+            if (frp != null) {
                 frp.removeFlowRule(f);
             }
         }
@@ -135,8 +163,11 @@
 
         for (FlowRule f : rules) {
             store.deleteFlowRule(f);
+            // FIXME: only accept request and push to provider on internal event
             device = deviceService.getDevice(f.deviceId());
             frp = getProvider(device.providerId());
+            // FIXME: flows removed from store and flows removed from might diverge
+            //        get rid of #removeRulesById?
             frp.removeRulesById(id, f);
         }
     }
@@ -352,7 +383,23 @@
     private class InternalStoreDelegate implements FlowRuleStoreDelegate {
         @Override
         public void notify(FlowRuleEvent event) {
-            eventDispatcher.post(event);
+            switch (event.type()) {
+            case RULE_ADD_REQUESTED:
+                applyFlowRulesToProviders(event.subject());
+                break;
+            case RULE_REMOVE_REQUESTED:
+                removeFlowRulesFromProviders(event.subject());
+                break;
+
+            case RULE_ADDED:
+            case RULE_REMOVED:
+            case RULE_UPDATED:
+                // only dispatch events related to switch
+                eventDispatcher.post(event);
+                break;
+            default:
+                break;
+            }
         }
     }