Initial implementation for DistributedFlowRuleStore utilizing master/backup replication
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 084435f..5062416 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -3,14 +3,20 @@
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.ApplicationId;
+import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.FlowEntry;
@@ -21,6 +27,13 @@
 import org.onlab.onos.net.flow.FlowRuleStore;
 import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
 import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
+import org.onlab.onos.store.flow.ReplicaInfo;
+import org.onlab.onos.store.serializers.DistributedStoreSerializers;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
 import org.slf4j.Logger;
 
 import com.google.common.collect.ArrayListMultimap;
@@ -28,9 +41,8 @@
 import com.google.common.collect.Multimap;
 
 /**
- * Manages inventory of flow rules using trivial in-memory implementation.
+ * Manages inventory of flow rules using a distributed state management protocol.
  */
-//FIXME I LIE. I AIN'T DISTRIBUTED
 @Component(immediate = true)
 @Service
 public class DistributedFlowRuleStore
@@ -46,6 +58,28 @@
     private final Multimap<Short, FlowRule> flowEntriesById =
             ArrayListMultimap.<Short, FlowRule>create();
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ReplicaInfoManager replicaInfoManager;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoPool.newBuilder()
+                    .register(DistributedStoreSerializers.COMMON)
+                    .build()
+                    .populate(1);
+        }
+    };
+
+    // TODO: make this configurable
+    private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
+
     @Activate
     public void activate() {
         log.info("Started");
@@ -91,26 +125,92 @@
     }
 
     @Override
-    public synchronized void storeFlowRule(FlowRule rule) {
-        FlowEntry f = new DefaultFlowEntry(rule);
-        DeviceId did = f.deviceId();
-        if (!flowEntries.containsEntry(did, f)) {
-            flowEntries.put(did, f);
-            flowEntriesById.put(rule.appId(), f);
+    public void storeFlowRule(FlowRule rule) {
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
+            storeFlowEntryInternal(rule);
+            return;
         }
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.STORE_FLOW_RULE,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
+    }
+
+    public synchronized void storeFlowEntryInternal(FlowRule flowRule) {
+        FlowEntry flowEntry = new DefaultFlowEntry(flowRule);
+        DeviceId deviceId = flowRule.deviceId();
+        // write to local copy.
+        if (!flowEntries.containsEntry(deviceId, flowEntry)) {
+            flowEntries.put(deviceId, flowEntry);
+            flowEntriesById.put(flowRule.appId(), flowEntry);
+        }
+        // write to backup.
+        // TODO: write to a hazelcast map.
     }
 
     @Override
     public synchronized void deleteFlowRule(FlowRule rule) {
-        FlowEntry entry = getFlowEntry(rule);
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
+            deleteFlowRuleInternal(rule);
+            return;
+        }
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.DELETE_FLOW_RULE,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
+    }
+
+    public synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
+        FlowEntry entry = getFlowEntry(flowRule);
         if (entry == null) {
             return;
         }
         entry.setState(FlowEntryState.PENDING_REMOVE);
+        // TODO: also update backup.
     }
 
     @Override
-    public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
+    public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
+            return addOrUpdateFlowRuleInternal(rule);
+        }
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
+    }
+
+    private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
         DeviceId did = rule.deviceId();
 
         // check if this new rule is an update to an existing entry
@@ -128,15 +228,39 @@
 
         flowEntries.put(did, rule);
         return null;
+
+        // TODO: also update backup.
     }
 
     @Override
-    public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) {
+    public FlowRuleEvent removeFlowRule(FlowEntry rule) {
+        ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
+        if (replicaInfo.master().get().equals(clusterService.getLocalNode())) {
+            // bypass and handle it locally
+            return removeFlowRuleInternal(rule);
+        }
+
+        ClusterMessage message = new ClusterMessage(
+                clusterService.getLocalNode().id(),
+                FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
+                SERIALIZER.encode(rule));
+
+        try {
+            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
+            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+        } catch (IOException | TimeoutException e) {
+            // FIXME: throw a FlowStoreException
+            throw new RuntimeException(e);
+        }
+    }
+
+    private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
         // This is where one could mark a rule as removed and still keep it in the store.
         if (flowEntries.remove(rule.deviceId(), rule)) {
             return new FlowRuleEvent(RULE_REMOVED, rule);
         } else {
             return null;
         }
+        // TODO: also update backup.
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
new file mode 100644
index 0000000..a43dad6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -0,0 +1,15 @@
+package org.onlab.onos.store.flow.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+/**
+ * MessageSubjects used by DistributedFlowRuleStore peer-peer communication.
+ */
+public final class FlowStoreMessageSubjects {
+    private FlowStoreMessageSubjects() {}
+    public static final  MessageSubject STORE_FLOW_RULE = new MessageSubject("peer-forward-store-flow-rule");
+    public static final MessageSubject DELETE_FLOW_RULE = new MessageSubject("peer-forward-delete-flow-rule");
+    public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
+        new MessageSubject("peer-forward-add-or-update-flow-rule");
+    public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index b63f844..f17c268 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -26,6 +26,7 @@
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.device.DefaultDeviceDescription;
 import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.flow.DefaultFlowRule;
 import org.onlab.onos.net.host.DefaultHostDescription;
 import org.onlab.onos.net.host.HostDescription;
 import org.onlab.onos.net.link.DefaultLinkDescription;
@@ -86,7 +87,8 @@
                     Timestamp.class,
                     HostId.class,
                     HostDescription.class,
-                    DefaultHostDescription.class
+                    DefaultHostDescription.class,
+                    DefaultFlowRule.class
                     )
             .register(URI.class, new URISerializer())
             .register(NodeId.class, new NodeIdSerializer())