Initial DistributedDlowRuleStore

- known bug: responding to ClusterMessage not possible.

Change-Id: Iaa4245c64d2a6219d7c48ed30ddca7d558dbc177
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index e5b2ed6..3da5d25 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -2,6 +2,7 @@
 
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -30,6 +31,7 @@
 import org.onlab.onos.store.AbstractStore;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
 import org.onlab.onos.store.flow.ReplicaInfo;
 import org.onlab.onos.store.flow.ReplicaInfoService;
@@ -80,10 +82,44 @@
     };
 
     // TODO: make this configurable
-    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
+    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
 
     @Activate
     public void activate() {
+        clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRule rule = SERIALIZER.decode(message.payload());
+                log.info("received add request for {}", rule);
+                storeFlowEntryInternal(rule);
+                // FIXME what to respond.
+                try {
+                    // FIXME: #respond() not working. responded message is
+                    // handled by this sender node and never goes back.
+                    message.respond(SERIALIZER.encode("ACK"));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+            }
+        });
+
+        clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
+
+            @Override
+            public void handle(ClusterMessage message) {
+                FlowRule rule = SERIALIZER.decode(message.payload());
+                log.info("received delete request for {}", rule);
+                deleteFlowRuleInternal(rule);
+                // FIXME what to respond.
+                try {
+                    message.respond(SERIALIZER.encode("ACK"));
+                } catch (IOException e) {
+                    log.error("Failed to respond back", e);
+                }
+
+            }
+        });
         log.info("Started");
     }
 
@@ -131,13 +167,14 @@
     }
 
     @Override
-    public void storeFlowRule(FlowRule rule) {
+    public boolean storeFlowRule(FlowRule rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            storeFlowEntryInternal(rule);
-            return;
+            return storeFlowEntryInternal(rule);
         }
 
+        log.warn("Not my flow forwarding to {}", replicaInfo.master().orNull());
+
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
                 FlowStoreMessageSubjects.STORE_FLOW_RULE,
@@ -150,26 +187,29 @@
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
+        return false;
     }
 
-    private synchronized void storeFlowEntryInternal(FlowRule flowRule) {
+    private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
         StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
         DeviceId deviceId = flowRule.deviceId();
         // write to local copy.
         if (!flowEntries.containsEntry(deviceId, flowEntry)) {
             flowEntries.put(deviceId, flowEntry);
             flowEntriesById.put(flowRule.appId(), flowEntry);
+            notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
+            return true;
         }
         // write to backup.
         // TODO: write to a hazelcast map.
+        return false;
     }
 
     @Override
-    public synchronized void deleteFlowRule(FlowRule rule) {
+    public synchronized boolean deleteFlowRule(FlowRule rule) {
         ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
         if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
-            deleteFlowRuleInternal(rule);
-            return;
+            return deleteFlowRuleInternal(rule);
         }
 
         ClusterMessage message = new ClusterMessage(
@@ -184,15 +224,21 @@
             // FIXME: throw a FlowStoreException
             throw new RuntimeException(e);
         }
+        return false;
     }
 
-    private synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
+    private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
         StoredFlowEntry entry = getFlowEntryInternal(flowRule);
         if (entry == null) {
-            return;
+            return false;
         }
         entry.setState(FlowEntryState.PENDING_REMOVE);
+
         // TODO: also update backup.
+
+        notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
+
+        return true;
     }
 
     @Override