Fetch flows in buckets to avoid large memory allocations/serialization in flow store

Additionally adds FlowRuleStoreException to make checkstyle happy again

Change-Id: I998c8eb8a730e10ac29b1dd1df8cefb4668b262c
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStoreException.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStoreException.java
new file mode 100644
index 0000000..2e1c922
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStoreException.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2015-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.flow;
+
+/**
+ * Top level exception for FlowRuleStore failures.
+ */
+@SuppressWarnings("serial")
+public class FlowRuleStoreException extends RuntimeException {
+    public FlowRuleStoreException() {
+    }
+
+    public FlowRuleStoreException(String message) {
+        super(message);
+    }
+
+    public FlowRuleStoreException(Throwable t) {
+        super(t);
+    }
+
+    /**
+     * Flowrule store operation timeout.
+     */
+    public static class Timeout extends FlowRuleStoreException {
+    }
+
+    /**
+     * Flowrule store operation interrupted.
+     */
+    public static class Interrupted extends FlowRuleStoreException {
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index 8ba93c6..0ffd59c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.flow.impl;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -30,6 +31,7 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onlab.util.KryoNamespace;
@@ -77,6 +79,7 @@
     private final MessageSubject getDigestsSubject;
     private final MessageSubject getBucketSubject;
     private final MessageSubject backupSubject;
+    private final MessageSubject getFlowsSubject;
 
     private final DeviceId deviceId;
     private final ClusterCommunicationService clusterCommunicator;
@@ -131,6 +134,7 @@
         getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
         getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
         backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
+        getFlowsSubject = new MessageSubject(String.format("flow-store-%s-flows", deviceId));
 
         addListeners();
 
@@ -197,11 +201,52 @@
      *
      * @return the set of flow entries in the table
      */
-    public Set<FlowEntry> getFlowEntries() {
-        return flowBuckets.values().stream()
-            .flatMap(bucket -> bucket.getFlowBucket().values().stream())
-            .flatMap(entries -> entries.values().stream())
-            .collect(Collectors.toSet());
+    public CompletableFuture<Iterable<FlowEntry>> getFlowEntries() {
+        // Fetch the entries for each bucket in parallel and then concatenate the sets
+        // to create a single iterable.
+        return Tools.allOf(flowBuckets.values()
+            .stream()
+            .map(this::getFlowEntries)
+            .collect(Collectors.toList()))
+            .thenApply(Iterables::concat);
+    }
+
+    /**
+     * Fetches the set of flow entries in the given bucket.
+     *
+     * @param bucketId the bucket for which to fetch flow entries
+     * @return a future to be completed once the flow entries have been retrieved
+     */
+    private CompletableFuture<Set<FlowEntry>> getFlowEntries(BucketId bucketId) {
+        return getFlowEntries(getBucket(bucketId.bucket()));
+    }
+
+    /**
+     * Fetches the set of flow entries in the given bucket.
+     *
+     * @param bucket the bucket for which to fetch flow entries
+     * @return a future to be completed once the flow entries have been retrieved
+     */
+    private CompletableFuture<Set<FlowEntry>> getFlowEntries(FlowBucket bucket) {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+        // If the local node is the master, fetch the entries locally. Otherwise, request the entries
+        // from the current master. Note that there's a change of a brief cycle during a mastership change.
+        if (replicaInfo.isMaster(localNodeId)) {
+            return CompletableFuture.completedFuture(
+                bucket.getFlowBucket().values().stream()
+                    .flatMap(entries -> entries.values().stream())
+                    .collect(Collectors.toSet()));
+        } else if (replicaInfo.master() != null) {
+            return clusterCommunicator.sendAndReceive(
+                bucket.bucketId(),
+                getFlowsSubject,
+                SERIALIZER::encode,
+                SERIALIZER::decode,
+                replicaInfo.master());
+        } else {
+            return CompletableFuture.completedFuture(Collections.emptySet());
+        }
     }
 
     /**
@@ -840,6 +885,8 @@
         receiveWithTimestamp(getDigestsSubject, v -> getDigests());
         receiveWithTimestamp(getBucketSubject, this::onGetBucket);
         receiveWithTimestamp(backupSubject, this::onBackup);
+        clusterCommunicator.<BucketId, Set<FlowEntry>>addSubscriber(
+            getFlowsSubject, SERIALIZER::decode, this::getFlowEntries, SERIALIZER::encode);
     }
 
     /**
@@ -849,6 +896,7 @@
         clusterCommunicator.removeSubscriber(getDigestsSubject);
         clusterCommunicator.removeSubscriber(getBucketSubject);
         clusterCommunicator.removeSubscriber(backupSubject);
+        clusterCommunicator.removeSubscriber(getFlowsSubject);
     }
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 04c6ecf..9a68356 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -15,6 +15,24 @@
 */
 package org.onosproject.store.flow.impl;
 
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Streams;
@@ -45,6 +63,7 @@
 import org.onosproject.net.flow.FlowRuleStore;
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.FlowRuleStoreException;
 import org.onosproject.net.flow.TableStatisticsEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -80,21 +99,6 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.lang.Math.max;
 import static java.lang.Math.min;
@@ -104,7 +108,6 @@
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
-import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
@@ -328,8 +331,6 @@
             REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
         clusterCommunicator.addSubscriber(
             GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
-        clusterCommunicator.addSubscriber(
-            GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
         clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
             GET_DEVICE_FLOW_COUNT,
             serializer::decode,
@@ -341,7 +342,6 @@
 
     private void unregisterMessageHandlers() {
         clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
-        clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
         clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
@@ -418,28 +418,7 @@
 
     @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
-        NodeId master = mastershipService.getMasterFor(deviceId);
-
-        if (master == null) {
-            log.debug("Failed to getFlowEntries: No master for {}", deviceId);
-            return Collections.emptyList();
-        }
-
-        if (Objects.equals(local, master)) {
-            return flowTable.getFlowEntries(deviceId);
-        }
-
-        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-            master, deviceId);
-
-        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
-            ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
-            serializer::encode,
-            serializer::decode,
-            master),
-            FLOW_RULE_STORE_TIMEOUT_MILLIS,
-            TimeUnit.MILLISECONDS,
-            Collections.emptyList());
+        return flowTable.getFlowEntries(deviceId);
     }
 
     @Override
@@ -780,9 +759,7 @@
             if (state == null) {
                 return getFlowRuleCount(deviceId);
             }
-            return (int) getFlowTable(deviceId)
-                .getFlowEntries()
-                .stream()
+            return (int) StreamSupport.stream(getFlowEntries(deviceId).spliterator(), false)
                 .filter(rule -> rule.state() == state)
                 .count();
         }
@@ -803,8 +780,17 @@
          * @param deviceId the device for which to lookup flow entries
          * @return the set of flow entries for the given device
          */
-        public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
-            return getFlowTable(deviceId).getFlowEntries();
+        public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+            try {
+                return getFlowTable(deviceId).getFlowEntries()
+                    .get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            } catch (ExecutionException e) {
+                throw new FlowRuleStoreException(e.getCause());
+            } catch (TimeoutException e) {
+                throw new FlowRuleStoreException.Timeout();
+            } catch (InterruptedException e) {
+                throw new FlowRuleStoreException.Interrupted();
+            }
         }
 
         /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index c57e9eb..e5a7631 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -29,9 +29,6 @@
     public static final MessageSubject GET_FLOW_ENTRY
         = new MessageSubject("peer-forward-get-flow-entry");
 
-    public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
-        = new MessageSubject("peer-forward-get-device-flow-entries");
-
     public static final MessageSubject GET_DEVICE_FLOW_COUNT
         = new MessageSubject("peer-forward-get-flow-count");