Fetch flows in buckets to avoid large memory allocations/serialization in flow store
Additionally adds FlowRuleStoreException to make checkstyle happy again
Change-Id: I998c8eb8a730e10ac29b1dd1df8cefb4668b262c
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStoreException.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStoreException.java
new file mode 100644
index 0000000..2e1c922
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStoreException.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2015-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.flow;
+
+/**
+ * Top level exception for FlowRuleStore failures.
+ */
+@SuppressWarnings("serial")
+public class FlowRuleStoreException extends RuntimeException {
+ public FlowRuleStoreException() {
+ }
+
+ public FlowRuleStoreException(String message) {
+ super(message);
+ }
+
+ public FlowRuleStoreException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * Flowrule store operation timeout.
+ */
+ public static class Timeout extends FlowRuleStoreException {
+ }
+
+ /**
+ * Flowrule store operation interrupted.
+ */
+ public static class Interrupted extends FlowRuleStoreException {
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index 8ba93c6..0ffd59c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -16,6 +16,7 @@
package org.onosproject.store.flow.impl;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -30,6 +31,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
@@ -77,6 +79,7 @@
private final MessageSubject getDigestsSubject;
private final MessageSubject getBucketSubject;
private final MessageSubject backupSubject;
+ private final MessageSubject getFlowsSubject;
private final DeviceId deviceId;
private final ClusterCommunicationService clusterCommunicator;
@@ -131,6 +134,7 @@
getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
+ getFlowsSubject = new MessageSubject(String.format("flow-store-%s-flows", deviceId));
addListeners();
@@ -197,11 +201,52 @@
*
* @return the set of flow entries in the table
*/
- public Set<FlowEntry> getFlowEntries() {
- return flowBuckets.values().stream()
- .flatMap(bucket -> bucket.getFlowBucket().values().stream())
- .flatMap(entries -> entries.values().stream())
- .collect(Collectors.toSet());
+ public CompletableFuture<Iterable<FlowEntry>> getFlowEntries() {
+ // Fetch the entries for each bucket in parallel and then concatenate the sets
+ // to create a single iterable.
+ return Tools.allOf(flowBuckets.values()
+ .stream()
+ .map(this::getFlowEntries)
+ .collect(Collectors.toList()))
+ .thenApply(Iterables::concat);
+ }
+
+ /**
+ * Fetches the set of flow entries in the given bucket.
+ *
+ * @param bucketId the bucket for which to fetch flow entries
+ * @return a future to be completed once the flow entries have been retrieved
+ */
+ private CompletableFuture<Set<FlowEntry>> getFlowEntries(BucketId bucketId) {
+ return getFlowEntries(getBucket(bucketId.bucket()));
+ }
+
+ /**
+ * Fetches the set of flow entries in the given bucket.
+ *
+ * @param bucket the bucket for which to fetch flow entries
+ * @return a future to be completed once the flow entries have been retrieved
+ */
+ private CompletableFuture<Set<FlowEntry>> getFlowEntries(FlowBucket bucket) {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+ // If the local node is the master, fetch the entries locally. Otherwise, request the entries
+ // from the current master. Note that there's a change of a brief cycle during a mastership change.
+ if (replicaInfo.isMaster(localNodeId)) {
+ return CompletableFuture.completedFuture(
+ bucket.getFlowBucket().values().stream()
+ .flatMap(entries -> entries.values().stream())
+ .collect(Collectors.toSet()));
+ } else if (replicaInfo.master() != null) {
+ return clusterCommunicator.sendAndReceive(
+ bucket.bucketId(),
+ getFlowsSubject,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ replicaInfo.master());
+ } else {
+ return CompletableFuture.completedFuture(Collections.emptySet());
+ }
}
/**
@@ -840,6 +885,8 @@
receiveWithTimestamp(getDigestsSubject, v -> getDigests());
receiveWithTimestamp(getBucketSubject, this::onGetBucket);
receiveWithTimestamp(backupSubject, this::onBackup);
+ clusterCommunicator.<BucketId, Set<FlowEntry>>addSubscriber(
+ getFlowsSubject, SERIALIZER::decode, this::getFlowEntries, SERIALIZER::encode);
}
/**
@@ -849,6 +896,7 @@
clusterCommunicator.removeSubscriber(getDigestsSubject);
clusterCommunicator.removeSubscriber(getBucketSubject);
clusterCommunicator.removeSubscriber(backupSubject);
+ clusterCommunicator.removeSubscriber(getFlowsSubject);
}
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 04c6ecf..9a68356 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -15,6 +15,24 @@
*/
package org.onosproject.store.flow.impl;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
@@ -45,6 +63,7 @@
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.FlowRuleStoreException;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -80,21 +99,6 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.Math.max;
import static java.lang.Math.min;
@@ -104,7 +108,6 @@
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
-import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
@@ -328,8 +331,6 @@
REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
clusterCommunicator.addSubscriber(
GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
- clusterCommunicator.addSubscriber(
- GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
clusterCommunicator.<Pair<DeviceId, FlowEntryState>, Integer>addSubscriber(
GET_DEVICE_FLOW_COUNT,
serializer::decode,
@@ -341,7 +342,6 @@
private void unregisterMessageHandlers() {
clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
- clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
@@ -418,28 +418,7 @@
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
- NodeId master = mastershipService.getMasterFor(deviceId);
-
- if (master == null) {
- log.debug("Failed to getFlowEntries: No master for {}", deviceId);
- return Collections.emptyList();
- }
-
- if (Objects.equals(local, master)) {
- return flowTable.getFlowEntries(deviceId);
- }
-
- log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- master, deviceId);
-
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
- ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
- serializer::encode,
- serializer::decode,
- master),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptyList());
+ return flowTable.getFlowEntries(deviceId);
}
@Override
@@ -780,9 +759,7 @@
if (state == null) {
return getFlowRuleCount(deviceId);
}
- return (int) getFlowTable(deviceId)
- .getFlowEntries()
- .stream()
+ return (int) StreamSupport.stream(getFlowEntries(deviceId).spliterator(), false)
.filter(rule -> rule.state() == state)
.count();
}
@@ -803,8 +780,17 @@
* @param deviceId the device for which to lookup flow entries
* @return the set of flow entries for the given device
*/
- public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
- return getFlowTable(deviceId).getFlowEntries();
+ public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ try {
+ return getFlowTable(deviceId).getFlowEntries()
+ .get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new FlowRuleStoreException(e.getCause());
+ } catch (TimeoutException e) {
+ throw new FlowRuleStoreException.Timeout();
+ } catch (InterruptedException e) {
+ throw new FlowRuleStoreException.Interrupted();
+ }
}
/**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index c57e9eb..e5a7631 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -29,9 +29,6 @@
public static final MessageSubject GET_FLOW_ENTRY
= new MessageSubject("peer-forward-get-flow-entry");
- public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
- = new MessageSubject("peer-forward-get-device-flow-entries");
-
public static final MessageSubject GET_DEVICE_FLOW_COUNT
= new MessageSubject("peer-forward-get-flow-count");