DistributedFlowRuleStore: remote batch support

Change-Id: I373a942697624440e025a8022a13394396058a71
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
index 4ba3366..d0d1820 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchEvent.java
@@ -30,7 +30,7 @@
      * @param request batch operation request.
      * @return event.
      */
-    public static FlowRuleBatchEvent create(FlowRuleBatchRequest request) {
+    public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request) {
         FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
         return event;
     }
@@ -41,7 +41,7 @@
      * @param result completed batch operation result.
      * @return event.
      */
-    public static FlowRuleBatchEvent create(FlowRuleBatchRequest request, CompletedBatchOperation result) {
+    public static FlowRuleBatchEvent completed(FlowRuleBatchRequest request, CompletedBatchOperation result) {
         FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_COMPLETED, request, result);
         return event;
     }
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
index 0414fcb..34e3d31 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleBatchRequest.java
@@ -9,10 +9,12 @@
 
 public class FlowRuleBatchRequest {
 
+    private final int batchId;
     private final List<FlowEntry> toAdd;
     private final List<FlowEntry> toRemove;
 
-    public FlowRuleBatchRequest(List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
+    public FlowRuleBatchRequest(int batchId, List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
+        this.batchId = batchId;
         this.toAdd = Collections.unmodifiableList(toAdd);
         this.toRemove = Collections.unmodifiableList(toRemove);
     }
@@ -35,4 +37,8 @@
         }
         return new FlowRuleBatchOperation(entries);
     }
+
+    public int batchId() {
+        return batchId;
+    }
 }
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 3ef9fc8..e91bb89 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -2,12 +2,14 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.onlab.util.Tools.namedThreads;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -74,6 +76,8 @@
 
     private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
 
+    private final ExecutorService futureListeners = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleStore store;
 
@@ -92,6 +96,8 @@
 
     @Deactivate
     public void deactivate() {
+        futureListeners.shutdownNow();
+
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(FlowRuleEvent.class);
         log.info("Stopped");
@@ -398,9 +404,9 @@
                 result.addListener(new Runnable() {
                     @Override
                     public void run() {
-                        store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
+                        store.batchOperationComplete(FlowRuleBatchEvent.completed(request, Futures.getUnchecked(result)));
                     }
-                }, Executors.newCachedThreadPool());
+                }, futureListeners);
 
                 break;
             case BATCH_OPERATION_COMPLETED:
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 85f928a..e5aa3e8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -3,15 +3,20 @@
 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
 import static org.slf4j.LoggerFactory.getLogger;
 import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
+import static org.onlab.util.Tools.namedThreads;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.List;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -22,7 +27,9 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.onos.ApplicationId;
 import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.net.Device;
 import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceService;
 import org.onlab.onos.net.flow.CompletedBatchOperation;
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.FlowEntry;
@@ -52,8 +59,12 @@
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * Manages inventory of flow rules using a distributed state management protocol.
@@ -74,13 +85,26 @@
             ArrayListMultimap.<Short, FlowRule>create();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ReplicaInfoService replicaInfoManager;
+    protected ReplicaInfoService replicaInfoManager;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterCommunicationService clusterCommunicator;
+    protected ClusterCommunicationService clusterCommunicator;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterService clusterService;
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    private final AtomicInteger localBatchIdGen = new AtomicInteger();
+
+
+    // FIXME switch to expiraing map/Cache?
+    private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
+
+    private final ExecutorService futureListeners =
+            Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
+
 
     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
@@ -97,36 +121,26 @@
 
     @Activate
     public void activate() {
-        clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
+        clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
 
             @Override
-            public void handle(ClusterMessage message) {
-                FlowRule rule = SERIALIZER.decode(message.payload());
-                log.info("received add request for {}", rule);
-                storeFlowRule(rule);
-                // FIXME what to respond.
-                try {
-                    message.respond(SERIALIZER.encode("ACK"));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
-            }
-        });
+            public void handle(final ClusterMessage message) {
+                FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
+                log.info("received batch request {}", operation);
+                final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
 
-        clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
+                f.addListener(new Runnable() {
 
-            @Override
-            public void handle(ClusterMessage message) {
-                FlowRule rule = SERIALIZER.decode(message.payload());
-                log.info("received delete request for {}", rule);
-                deleteFlowRule(rule);
-                // FIXME what to respond.
-                try {
-                    message.respond(SERIALIZER.encode("ACK"));
-                } catch (IOException e) {
-                    log.error("Failed to respond back", e);
-                }
-
+                    @Override
+                    public void run() {
+                         CompletedBatchOperation result = Futures.getUnchecked(f);
+                        try {
+                            message.respond(SERIALIZER.encode(result));
+                        } catch (IOException e) {
+                            log.error("Failed to respond back", e);
+                        }
+                    }
+                }, futureListeners);
             }
         });
 
@@ -159,7 +173,13 @@
     // make it device specific.
     @Override
     public int getFlowRuleCount() {
-        return flowEntries.size();
+        // implementing in-efficient operation for debugging purpose.
+        int sum = 0;
+        for (Device device : deviceService.getDevices()) {
+            final DeviceId did = device.id();
+            sum += Iterables.size(getFlowEntries(did));
+        }
+        return sum;
     }
 
     @Override
@@ -218,6 +238,7 @@
         storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
     }
 
+    @Override
     public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
         if (operation.getOperations().isEmpty()) {
             return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
@@ -236,7 +257,7 @@
 
         ClusterMessage message = new ClusterMessage(
                 clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.STORE_FLOW_RULE,
+                APPLY_BATCH_FLOWS,
                 SERIALIZER.encode(operation));
 
         try {
@@ -250,7 +271,7 @@
         return null;
     }
 
-    private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
+    private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
         List<FlowEntry> toRemove = new ArrayList<>();
         List<FlowEntry> toAdd = new ArrayList<>();
         // TODO: backup changes to hazelcast map
@@ -261,8 +282,8 @@
                 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
                 if (entry != null) {
                     entry.setState(FlowEntryState.PENDING_REMOVE);
+                    toRemove.add(entry);
                 }
-                toRemove.add(entry);
             } else if (op.equals(FlowRuleOperation.ADD)) {
                 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
                 DeviceId deviceId = flowRule.deviceId();
@@ -276,9 +297,13 @@
         if (toAdd.isEmpty() && toRemove.isEmpty()) {
             return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
         }
-        notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
-        // TODO: imlpement this.
-        return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
+
+        SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
+        final int batchId = localBatchIdGen.incrementAndGet();
+
+        pendingFutures.put(batchId, r);
+        notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
+        return r;
     }
 
     @Override
@@ -293,18 +318,9 @@
             return addOrUpdateFlowRuleInternal(rule);
         }
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
-                SERIALIZER.encode(rule));
-
-        try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException e) {
-            // FIXME: throw a FlowStoreException
-            throw new RuntimeException(e);
-        }
+        log.error("Tried to update FlowRule {} state,"
+                + " while the Node was not the master.", rule);
+        return null;
     }
 
     private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
@@ -338,18 +354,9 @@
             return removeFlowRuleInternal(rule);
         }
 
-        ClusterMessage message = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
-                SERIALIZER.encode(rule));
-
-        try {
-            ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
-            return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
-        } catch (IOException | TimeoutException e) {
-            // FIXME: throw a FlowStoreException
-            throw new RuntimeException(e);
-        }
+        log.error("Tried to remove FlowRule {},"
+                + " while the Node was not the master.", rule);
+        return null;
     }
 
     private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -364,6 +371,11 @@
 
     @Override
     public void batchOperationComplete(FlowRuleBatchEvent event) {
+        SettableFuture<CompletedBatchOperation> future
+            = pendingFutures.get(event.subject().batchId());
+        if (future != null) {
+            future.set(event.result());
+        }
         notifyDelegate(event);
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
index ca833b8..ef68b55 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/FlowStoreMessageSubjects.java
@@ -7,10 +7,10 @@
  */
 public final class FlowStoreMessageSubjects {
     private FlowStoreMessageSubjects() {}
-    public static final  MessageSubject STORE_FLOW_RULE = new MessageSubject("peer-forward-store-flow-rule");
-    public static final MessageSubject DELETE_FLOW_RULE = new MessageSubject("peer-forward-delete-flow-rule");
-    public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
-        new MessageSubject("peer-forward-add-or-update-flow-rule");
-    public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
-    public static final MessageSubject GET_FLOW_ENTRY = new MessageSubject("peer-forward-get-flow-entry");
+
+    public static final  MessageSubject APPLY_BATCH_FLOWS
+        = new MessageSubject("peer-forward-apply-batch");
+
+    public static final MessageSubject GET_FLOW_ENTRY
+        = new MessageSubject("peer-forward-get-flow-entry");
 }
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
index 316a3b4..f9352fe 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -58,7 +58,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     @Activate
     public void activate() {
@@ -76,9 +75,9 @@
             }
         };
 
-        roleMap = new SMap(theInstance.getMap("nodeRoles"), this.serializer);
+        roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap("nodeRoles"), this.serializer);
         roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
-        terms = new SMap(theInstance.getMap("terms"), this.serializer);
+        terms = new SMap<>(theInstance.<byte[], byte[]>getMap("terms"), this.serializer);
         clusterSize = theInstance.getAtomicLong("clustersize");
 
         log.info("Started");
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index 7fddb01..0e9e19c 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -5,6 +5,7 @@
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.DefaultControllerNode;
@@ -27,12 +28,15 @@
 import org.onlab.onos.net.PortNumber;
 import org.onlab.onos.net.device.DefaultDeviceDescription;
 import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.flow.CompletedBatchOperation;
 import org.onlab.onos.net.flow.DefaultFlowEntry;
 import org.onlab.onos.net.flow.DefaultFlowRule;
 import org.onlab.onos.net.flow.DefaultTrafficSelector;
 import org.onlab.onos.net.flow.DefaultTrafficTreatment;
 import org.onlab.onos.net.flow.FlowEntry;
 import org.onlab.onos.net.flow.FlowId;
+import org.onlab.onos.net.flow.FlowRuleBatchEntry;
+import org.onlab.onos.net.flow.FlowRuleBatchOperation;
 import org.onlab.onos.net.flow.StoredFlowEntry;
 import org.onlab.onos.net.flow.criteria.Criteria;
 import org.onlab.onos.net.flow.criteria.Criterion;
@@ -80,6 +84,7 @@
                     Arrays.asList().getClass(),
                     HashMap.class,
                     HashSet.class,
+                    LinkedList.class,
                     //
                     //
                     ControllerNode.State.class,
@@ -118,7 +123,11 @@
                     DefaultTrafficTreatment.class,
                     Instructions.DropInstruction.class,
                     Instructions.OutputInstruction.class,
-                    RoleInfo.class
+                    RoleInfo.class,
+                    FlowRuleBatchOperation.class,
+                    CompletedBatchOperation.class,
+                    FlowRuleBatchEntry.class,
+                    FlowRuleBatchEntry.FlowRuleOperation.class
                     )
             .register(URI.class, new URISerializer())
             .register(NodeId.class, new NodeIdSerializer())
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index bbfc263..8210a2f 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -176,8 +176,8 @@
             }
             // new flow rule added
             existing.add(f);
-            notifyDelegate(FlowRuleBatchEvent.create(
-                    new FlowRuleBatchRequest(
+            notifyDelegate(FlowRuleBatchEvent.requested(
+                    new FlowRuleBatchRequest( 1, /* FIXME generate something */
                             Arrays.<FlowEntry>asList(f),
                             Collections.<FlowEntry>emptyList())));
         }
@@ -194,8 +194,8 @@
                     synchronized (entry) {
                         entry.setState(FlowEntryState.PENDING_REMOVE);
                         // TODO: Should we notify only if it's "remote" event?
-                        notifyDelegate(FlowRuleBatchEvent.create(
-                                new FlowRuleBatchRequest(
+                        notifyDelegate(FlowRuleBatchEvent.requested(
+                                new FlowRuleBatchRequest(1, /* FIXME generate something */
                                         Collections.<FlowEntry>emptyList(),
                                         Arrays.<FlowEntry>asList(entry))));
                     }