Fetch flows in buckets to avoid large memory allocations/serialization in flow store

Additionally adds FlowRuleStoreException to make checkstyle happy again

Change-Id: I998c8eb8a730e10ac29b1dd1df8cefb4668b262c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 04c6ecf..9a68356 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -15,6 +15,24 @@
 */
 package org.onosproject.store.flow.impl;
 
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Streams;
@@ -45,6 +63,7 @@
 import org.onosproject.net.flow.FlowRuleStore;
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
 import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.FlowRuleStoreException;
 import org.onosproject.net.flow.TableStatisticsEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -80,21 +99,6 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.lang.Math.max;
 import static java.lang.Math.min;
@@ -104,7 +108,6 @@
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
-import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
@@ -328,8 +331,6 @@
             REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
         clusterCommunicator.addSubscriber(
             GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
-        clusterCommunicator.addSubscriber(
-            GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
         clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
             GET_DEVICE_FLOW_COUNT,
             serializer::decode,
@@ -341,7 +342,6 @@
 
     private void unregisterMessageHandlers() {
         clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
-        clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
         clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
         clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
         clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
@@ -418,28 +418,7 @@
 
     @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
-        NodeId master = mastershipService.getMasterFor(deviceId);
-
-        if (master == null) {
-            log.debug("Failed to getFlowEntries: No master for {}", deviceId);
-            return Collections.emptyList();
-        }
-
-        if (Objects.equals(local, master)) {
-            return flowTable.getFlowEntries(deviceId);
-        }
-
-        log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
-            master, deviceId);
-
-        return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
-            ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
-            serializer::encode,
-            serializer::decode,
-            master),
-            FLOW_RULE_STORE_TIMEOUT_MILLIS,
-            TimeUnit.MILLISECONDS,
-            Collections.emptyList());
+        return flowTable.getFlowEntries(deviceId);
     }
 
     @Override
@@ -780,9 +759,7 @@
             if (state == null) {
                 return getFlowRuleCount(deviceId);
             }
-            return (int) getFlowTable(deviceId)
-                .getFlowEntries()
-                .stream()
+            return (int) StreamSupport.stream(getFlowEntries(deviceId).spliterator(), false)
                 .filter(rule -> rule.state() == state)
                 .count();
         }
@@ -803,8 +780,17 @@
          * @param deviceId the device for which to lookup flow entries
          * @return the set of flow entries for the given device
          */
-        public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
-            return getFlowTable(deviceId).getFlowEntries();
+        public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+            try {
+                return getFlowTable(deviceId).getFlowEntries()
+                    .get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+            } catch (ExecutionException e) {
+                throw new FlowRuleStoreException(e.getCause());
+            } catch (TimeoutException e) {
+                throw new FlowRuleStoreException.Timeout();
+            } catch (InterruptedException e) {
+                throw new FlowRuleStoreException.Interrupted();
+            }
         }
 
         /**