DistributedFlowRuleStore: remote batch support

Change-Id: I373a942697624440e025a8022a13394396058a71
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 85f928a..e5aa3e8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -3,15 +3,20 @@
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
 import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.onlab.util.Tools.namedThreads;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.List;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -22,7 +27,9 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceService;
 import org.onlab.onos.net.flow.CompletedBatchOperation;
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.FlowEntry;
@@ -52,8 +59,12 @@
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * Manages inventory of flow rules using a distributed state management protocol.
@@ -74,13 +85,26 @@
             ArrayListMultimap.<Short, FlowRule>create();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ReplicaInfoService replicaInfoManager;
+    protected ReplicaInfoService replicaInfoManager;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterCommunicationService clusterCommunicator;
+    protected ClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterService clusterService;
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    private final AtomicInteger localBatchIdGen = new AtomicInteger();
+
+
+    // FIXME switch to expiraing map/Cache?
+    private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
+
+    private final ExecutorService futureListeners =
+            Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
+
 
     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
@@ -97,36 +121,26 @@
 
     @Activate
     public void activate() {
-        clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
+        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
 
             @Override
-            public void handle(ClusterMessage message) {
-                FlowRule rule = SERIALIZER.decode(message.payload());
-                log.info("received add request for {}", rule);
-                storeFlowRule(rule);
-                // FIXME what to respond.
-                try {
-                    message.respond(SERIALIZER.encode("ACK"));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
-            }
-        });
+            public void handle(final ClusterMessage message) {
+                FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
+                log.info("received batch request {}", operation);
+                final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
 
-        clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
+                f.addListener(new Runnable() {
 
-            @Override
-            public void handle(ClusterMessage message) {
-                FlowRule rule = SERIALIZER.decode(message.payload());
-                log.info("received delete request for {}", rule);
-                deleteFlowRule(rule);
-                // FIXME what to respond.
-                try {
-                    message.respond(SERIALIZER.encode("ACK"));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
-
+                    @Override
+                    public void run() {
+                         CompletedBatchOperation result = Futures.getUnchecked(f);
+                        try {
+                            message.respond(SERIALIZER.encode(result));
+                        } catch (IOException e) {
+                            log.error("Failed to respond back", e);
+                        }
+                    }
+                }, futureListeners);
             }
         });
 
@@ -159,7 +173,13 @@
     // make it device specific.
     @Override
     public int getFlowRuleCount() {
-        return flowEntries.size();
+        // implementing in-efficient operation for debugging purpose.
+        int sum = 0;
+        for (Device device : deviceService.getDevices()) {
+            final DeviceId did = device.id();
+            sum += Iterables.size(getFlowEntries(did));
+        }
+        return sum;
     }
 
     @Override
@@ -218,6 +238,7 @@
         storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
     }
 
+    @Override
     public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
         if (operation.getOperations().isEmpty()) {
             return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
@@ -236,7 +257,7 @@
 
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.STORE_FLOW_RULE,
+                APPLY_BATCH_FLOWS,
                 SERIALIZER.encode(operation));
 
         try {
@@ -250,7 +271,7 @@
         return null;
     }
 
-    private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
+    private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
         List<FlowEntry> toRemove = new ArrayList<>();
         List<FlowEntry> toAdd = new ArrayList<>();
         // TODO: backup changes to hazelcast map
@@ -261,8 +282,8 @@
                 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
                 if (entry != null) {
                     entry.setState(FlowEntryState.PENDING_REMOVE);
+                    toRemove.add(entry);
                 }
-                toRemove.add(entry);
             } else if (op.equals(FlowRuleOperation.ADD)) {
                 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
                 DeviceId deviceId = flowRule.deviceId();
@@ -276,9 +297,13 @@
         if (toAdd.isEmpty() && toRemove.isEmpty()) {
             return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
         }
-        notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
-        // TODO: imlpement this.
-        return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
+
+        SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
+        final int batchId = localBatchIdGen.incrementAndGet();
+
+        pendingFutures.put(batchId, r);
+        notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+        return r;
     }
 
     @Override
@@ -293,18 +318,9 @@
             return addOrUpdateFlowRuleInternal(rule);
         }
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
-                SERIALIZER.encode(rule));
-
-        try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException e) {
-            // FIXME: throw a FlowStoreException
-            throw new RuntimeException(e);
-        }
+        log.error("Tried to update FlowRule {} state,"
+                + " while the Node was not the master.", rule);
+        return null;
     }
 
     private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
@@ -338,18 +354,9 @@
             return removeFlowRuleInternal(rule);
         }
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
-                SERIALIZER.encode(rule));
-
-        try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException e) {
-            // FIXME: throw a FlowStoreException
-            throw new RuntimeException(e);
-        }
+        log.error("Tried to remove FlowRule {},"
+                + " while the Node was not the master.", rule);
+        return null;
     }
 
     private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -364,6 +371,11 @@
 
     @Override
     public void batchOperationComplete(FlowRuleBatchEvent event) {
+        SettableFuture<CompletedBatchOperation> future
+            = pendingFutures.get(event.subject().batchId());
+        if (future != null) {
+            future.set(event.result());
+        }
         notifyDelegate(event);
     }
 }