Fetch flows in buckets to avoid large memory allocations/serialization in flow store
Additionally adds FlowRuleStoreException to make checkstyle happy again
Change-Id: I998c8eb8a730e10ac29b1dd1df8cefb4668b262c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 04c6ecf..9a68356 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -15,6 +15,24 @@
*/
package org.onosproject.store.flow.impl;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
@@ -45,6 +63,7 @@
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.FlowRuleStoreException;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -80,21 +99,6 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.Math.max;
import static java.lang.Math.min;
@@ -104,7 +108,6 @@
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
-import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
@@ -328,8 +331,6 @@
REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
clusterCommunicator.addSubscriber(
GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
- clusterCommunicator.addSubscriber(
- GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
GET_DEVICE_FLOW_COUNT,
serializer::decode,
@@ -341,7 +342,6 @@
private void unregisterMessageHandlers() {
clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
- clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
@@ -418,28 +418,7 @@
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
- NodeId master = mastershipService.getMasterFor(deviceId);
-
- if (master == null) {
- log.debug("Failed to getFlowEntries: No master for {}", deviceId);
- return Collections.emptyList();
- }
-
- if (Objects.equals(local, master)) {
- return flowTable.getFlowEntries(deviceId);
- }
-
- log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- master, deviceId);
-
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
- ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
- serializer::encode,
- serializer::decode,
- master),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptyList());
+ return flowTable.getFlowEntries(deviceId);
}
@Override
@@ -780,9 +759,7 @@
if (state == null) {
return getFlowRuleCount(deviceId);
}
- return (int) getFlowTable(deviceId)
- .getFlowEntries()
- .stream()
+ return (int) StreamSupport.stream(getFlowEntries(deviceId).spliterator(), false)
.filter(rule -> rule.state() == state)
.count();
}
@@ -803,8 +780,17 @@
* @param deviceId the device for which to lookup flow entries
* @return the set of flow entries for the given device
*/
- public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
- return getFlowTable(deviceId).getFlowEntries();
+ public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ try {
+ return getFlowTable(deviceId).getFlowEntries()
+ .get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new FlowRuleStoreException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new FlowRuleStoreException.Timeout();
+ } catch (InterruptedException e) {
+ throw new FlowRuleStoreException.Interrupted();
+ }
}
/**