Refactor flow rule store to resolve missing flows with old masters following mastership reassignment
Change-Id: I7b7e639c84cbd23fe9ab1f12080f865cdfc7f7f9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java
new file mode 100644
index 0000000..7034118
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BackupOperation.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.cluster.NodeId;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Identifier representing a backup of a distinct bucket to a specific node.
+ */
+public class BackupOperation {
+ private final NodeId nodeId;
+ private final int bucketId;
+
+ BackupOperation(NodeId nodeId, int bucketId) {
+ this.nodeId = nodeId;
+ this.bucketId = bucketId;
+ }
+
+ /**
+ * Returns the node identifier.
+ *
+ * @return the node identifier
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Returns the bucket identifier.
+ *
+ * @return the bucket identifier
+ */
+ public int bucket() {
+ return bucketId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(nodeId, bucketId);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other != null && other instanceof BackupOperation) {
+ BackupOperation that = (BackupOperation) other;
+ return this.nodeId.equals(that.nodeId)
+ && this.bucketId == that.bucketId;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("nodeId", nodeId())
+ .add("bucket", bucket())
+ .toString();
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java
new file mode 100644
index 0000000..33cc304
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/BucketId.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.net.DeviceId;
+
+/**
+ * Represents a distinct device flow bucket.
+ */
+public class BucketId {
+ private final DeviceId deviceId;
+ private final int bucket;
+
+ BucketId(DeviceId deviceId, int bucket) {
+ this.deviceId = deviceId;
+ this.bucket = bucket;
+ }
+
+ /**
+ * Returns the bucket device identifier.
+ *
+ * @return the device identifier
+ */
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ /**
+ * Returns the bucket number.
+ *
+ * @return the bucket number
+ */
+ public int bucket() {
+ return bucket;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, bucket);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof BucketId) {
+ BucketId that = (BucketId) other;
+ return this.deviceId.equals(that.deviceId)
+ && this.bucket == that.bucket;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s/%d", deviceId, bucket);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
new file mode 100644
index 0000000..6bc5b5f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -0,0 +1,822 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.store.LogicalTimestamp;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flow table for all flows associated with a specific device.
+ * <p>
+ * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
+ * table performs communication independent of other device flow tables for more parallelism.
+ * <p>
+ * This implementation uses several different replication protocols. Changes that occur on the device master are
+ * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
+ * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
+ * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
+ * device, allowing mastership to be reassigned to non-backup nodes.
+ */
+public class DeviceFlowTable {
+ private static final int NUM_BUCKETS = 1024;
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(BucketId.class)
+ .register(FlowBucket.class)
+ .register(FlowBucketDigest.class)
+ .register(LogicalTimestamp.class)
+ .register(Timestamped.class)
+ .build());
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final MessageSubject getDigestsSubject;
+ private final MessageSubject getBucketSubject;
+ private final MessageSubject backupSubject;
+
+ private final DeviceId deviceId;
+ private final ClusterCommunicationService clusterCommunicator;
+ private final LifecycleManager lifecycleManager;
+ private final ScheduledExecutorService executorService;
+ private final NodeId localNodeId;
+
+ private final LogicalClock clock = new LogicalClock();
+
+ private volatile DeviceReplicaInfo replicaInfo;
+ private volatile long activeTerm;
+
+ private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
+ @Override
+ public void event(LifecycleEvent event) {
+ executorService.execute(() -> onLifecycleEvent(event));
+ }
+ };
+
+ private ScheduledFuture<?> backupFuture;
+ private ScheduledFuture<?> antiEntropyFuture;
+
+ private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
+ private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
+
+ private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
+ private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
+
+ DeviceFlowTable(
+ DeviceId deviceId,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator,
+ LifecycleManager lifecycleManager,
+ ScheduledExecutorService executorService,
+ long backupPeriod,
+ long antiEntropyPeriod) {
+ this.deviceId = deviceId;
+ this.clusterCommunicator = clusterCommunicator;
+ this.lifecycleManager = lifecycleManager;
+ this.executorService = executorService;
+ this.localNodeId = clusterService.getLocalNode().id();
+
+ addListeners();
+
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
+ }
+
+ getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
+ getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
+ backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
+
+ setBackupPeriod(backupPeriod);
+ setAntiEntropyPeriod(antiEntropyPeriod);
+ registerSubscribers();
+
+ startTerm(lifecycleManager.getReplicaInfo());
+ }
+
+ /**
+ * Sets the flow table backup period.
+ *
+ * @param backupPeriod the flow table backup period in milliseconds
+ */
+ synchronized void setBackupPeriod(long backupPeriod) {
+ ScheduledFuture<?> backupFuture = this.backupFuture;
+ if (backupFuture != null) {
+ backupFuture.cancel(false);
+ }
+ this.backupFuture = executorService.scheduleAtFixedRate(
+ this::backup, backupPeriod, backupPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Sets the flow table anti-entropy period.
+ *
+ * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
+ */
+ synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
+ ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
+ if (antiEntropyFuture != null) {
+ antiEntropyFuture.cancel(false);
+ }
+ this.antiEntropyFuture = executorService.scheduleAtFixedRate(
+ this::runAntiEntropy, antiEntropyPeriod, antiEntropyPeriod, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Counts the flows in the table.
+ *
+ * @return the total number of flows in the table
+ */
+ public int count() {
+ return flowBuckets.values().stream()
+ .mapToInt(FlowBucket::count)
+ .sum();
+ }
+
+ /**
+ * Returns the flow entry for the given rule.
+ *
+ * @param rule the rule for which to lookup the flow entry
+ * @return the flow entry for the given rule
+ */
+ public StoredFlowEntry getFlowEntry(FlowRule rule) {
+ return getBucket(rule.id())
+ .getFlowEntries(rule.id())
+ .get(rule);
+ }
+
+ /**
+ * Returns the set of flow entries in the table.
+ *
+ * @return the set of flow entries in the table
+ */
+ public Set<FlowEntry> getFlowEntries() {
+ return flowBuckets.values().stream()
+ .flatMap(bucket -> bucket.getFlowBucket().values().stream())
+ .flatMap(entries -> entries.values().stream())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Returns the bucket for the given flow identifier.
+ *
+ * @param flowId the flow identifier
+ * @return the bucket for the given flow identifier
+ */
+ private FlowBucket getBucket(FlowId flowId) {
+ return getBucket(bucket(flowId));
+ }
+
+ /**
+ * Returns the bucket with the given identifier.
+ *
+ * @param bucketId the bucket identifier
+ * @return the bucket with the given identifier
+ */
+ private FlowBucket getBucket(int bucketId) {
+ return flowBuckets.get(bucketId);
+ }
+
+ /**
+ * Returns the bucket number for the given flow identifier.
+ *
+ * @param flowId the flow identifier
+ * @return the bucket number for the given flow identifier
+ */
+ private int bucket(FlowId flowId) {
+ return Math.abs((int) (flowId.id() % NUM_BUCKETS));
+ }
+
+ /**
+ * Returns the digests for all buckets in the flow table for the device.
+ *
+ * @return the set of digests for all buckets for the device
+ */
+ private Set<FlowBucketDigest> getDigests() {
+ return flowBuckets.values()
+ .stream()
+ .map(bucket -> bucket.getDigest())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Returns the digest for the given bucket.
+ *
+ * @param bucket the bucket for which to return the digest
+ * @return the digest for the given bucket
+ */
+ private FlowBucketDigest getDigest(int bucket) {
+ return flowBuckets.get(bucket).getDigest();
+ }
+
+ /**
+ * Adds an entry to the table.
+ *
+ * @param rule the rule to add
+ * @return a future to be completed once the rule has been added
+ */
+ public CompletableFuture<Void> add(FlowEntry rule) {
+ return runInTerm(rule.id(), (bucket, term) -> {
+ bucket.add(rule, term, clock);
+ return null;
+ });
+ }
+
+ /**
+ * Updates an entry in the table.
+ *
+ * @param rule the rule to update
+ * @return a future to be completed once the rule has been updated
+ */
+ public CompletableFuture<Void> update(FlowEntry rule) {
+ return runInTerm(rule.id(), (bucket, term) -> {
+ bucket.update(rule, term, clock);
+ return null;
+ });
+ }
+
+ /**
+ * Applies the given update function to the rule.
+ *
+ * @param rule the rule to update
+ * @param function the update function to apply
+ * @param <T> the result type
+ * @return a future to be completed with the update result or {@code null} if the rule was not updated
+ */
+ public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
+ return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
+ }
+
+ /**
+ * Removes an entry from the table.
+ *
+ * @param rule the rule to remove
+ * @return a future to be completed once the rule has been removed
+ */
+ public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
+ return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
+ }
+
+ /**
+ * Runs the given function in the current term.
+ *
+ * @param flowId the flow identifier indicating the bucket in which to run the function
+ * @param function the function to execute in the current term
+ * @param <T> the future result type
+ * @return a future to be completed with the function result once it has been run
+ */
+ private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return Tools.exceptionalFuture(new IllegalStateException());
+ }
+
+ FlowBucket bucket = getBucket(flowId);
+
+ // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
+ // the change to be executed once the master has been synchronized.
+ final long term = replicaInfo.term();
+ if (activeTerm < term) {
+ log.debug("Enqueueing operation for device {}", deviceId);
+ synchronized (flowTasks) {
+ // Double checked lock on the active term.
+ if (activeTerm < term) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
+ .add(() -> future.complete(function.apply(bucket, term)));
+ return future;
+ }
+ }
+ }
+ return CompletableFuture.completedFuture(function.apply(bucket, term));
+ }
+
+ /**
+ * Backs up all buckets in the given device to the given node.
+ */
+ private void backup() {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+ // If the local node is not currently the master, skip the backup.
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return;
+ }
+
+ // Otherwise, iterate through backup nodes and backup the device.
+ for (NodeId nodeId : replicaInfo.backups()) {
+ try {
+ backup(nodeId, replicaInfo.term());
+ } catch (Exception e) {
+ log.error("Backup of " + deviceId + " to " + nodeId + " failed", e);
+ }
+ }
+ }
+
+ /**
+ * Backs up all buckets for the device to the given node.
+ *
+ * @param nodeId the node to which to back up the device
+ * @param term the term for which to backup to the node
+ */
+ private void backup(NodeId nodeId, long term) {
+ for (FlowBucket bucket : flowBuckets.values()) {
+ // If the bucket is not in the current term, skip it. This forces synchronization of the bucket
+ // to occur prior to the new master replicating changes in the bucket to backups.
+ if (bucket.term() != term) {
+ continue;
+ }
+
+ // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
+ LogicalTimestamp timestamp = bucket.timestamp();
+
+ // If the backup can be run (no concurrent backup to the node in progress) then run it.
+ BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
+ if (startBackup(operation, timestamp)) {
+ backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
+ if (error != null) {
+ log.debug("Backup operation {} failed", operation, error);
+ failBackup(operation);
+ } else if (succeeded) {
+ succeedBackup(operation, timestamp);
+ backup(nodeId, term);
+ } else {
+ log.debug("Backup operation {} failed: term mismatch", operation);
+ failBackup(operation);
+ }
+ }, executorService);
+ }
+ }
+ }
+
+ /**
+ * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
+ * <p>
+ * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
+ * are pending replication for the backup operation.
+ *
+ * @param operation the operation to start
+ * @param timestamp the timestamp for which to start the backup operation
+ * @return indicates whether the given backup operation should be started
+ */
+ private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
+ LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
+ return timestamp != null
+ && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
+ && inFlightUpdates.add(operation);
+ }
+
+ /**
+ * Fails the given backup operation.
+ *
+ * @param operation the backup operation to fail
+ */
+ private void failBackup(BackupOperation operation) {
+ inFlightUpdates.remove(operation);
+ }
+
+ /**
+ * Succeeds the given backup operation.
+ * <p>
+ * The last backup time for the operation will be updated and the operation will be removed from
+ * in-flight updates.
+ *
+ * @param operation the operation to succeed
+ * @param timestamp the timestamp at which the operation was <em>started</em>
+ */
+ private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
+ lastBackupTimes.put(operation, timestamp);
+ inFlightUpdates.remove(operation);
+ }
+
+ /**
+ * Resets the last completion time for the given backup operation to ensure it's replicated again.
+ *
+ * @param operation the backup operation to reset
+ */
+ private void resetBackup(BackupOperation operation) {
+ lastBackupTimes.remove(operation);
+ }
+
+ /**
+ * Performs the given backup operation.
+ *
+ * @param bucket the bucket to backup
+ * @param nodeId the node to which to backup the bucket
+ * @return a future to be completed with a boolean indicating whether the backup operation was successful
+ */
+ private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
+ if (log.isDebugEnabled()) {
+ log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
+ }
+ return sendWithTimestamp(bucket, backupSubject, nodeId);
+ }
+
+ /**
+ * Handles a flow bucket backup from a remote peer.
+ *
+ * @param flowBucket the flow bucket to back up
+ * @return the set of flows that could not be backed up
+ */
+ private boolean onBackup(FlowBucket flowBucket) {
+ if (log.isDebugEnabled()) {
+ log.debug("{} - Received {} flow entries in bucket {} to backup",
+ deviceId, flowBucket.count(), flowBucket.bucketId());
+ }
+
+ try {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+
+ // If the backup is for a different term, reject the request until we learn about the new term.
+ if (flowBucket.term() != replicaInfo.term()) {
+ log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
+ return false;
+ }
+
+ flowBuckets.compute(flowBucket.bucketId().bucket(),
+ (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
+ return true;
+ } catch (Exception e) {
+ log.warn("Failure processing backup request", e);
+ return false;
+ }
+ }
+
+ /**
+ * Runs the anti-entropy protocol.
+ */
+ private void runAntiEntropy() {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return;
+ }
+
+ for (NodeId nodeId : replicaInfo.backups()) {
+ runAntiEntropy(nodeId);
+ }
+ }
+
+ /**
+ * Runs the anti-entropy protocol against the given peer.
+ *
+ * @param nodeId the node with which to execute the anti-entropy protocol
+ */
+ private void runAntiEntropy(NodeId nodeId) {
+ requestDigests(nodeId).thenAcceptAsync((digests) -> {
+ // Compute a set of missing BucketIds based on digest times and send them back to the master.
+ for (FlowBucketDigest remoteDigest : digests) {
+ FlowBucket localBucket = getBucket(remoteDigest.bucket());
+ if (localBucket.getDigest().isNewerThan(remoteDigest)) {
+ log.debug("Detected missing flow entries on node {} in bucket {}/{}",
+ nodeId, deviceId, remoteDigest.bucket());
+ resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
+ }
+ }
+ }, executorService);
+ }
+
+ /**
+ * Sends a digest request to the given node.
+ *
+ * @param nodeId the node to which to send the request
+ * @return future to be completed with the set of digests for the given device on the given node
+ */
+ private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
+ return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
+ }
+
+ /**
+ * Synchronizes flows from the previous master or backups.
+ *
+ * @param prevReplicaInfo the previous replica info
+ * @param newReplicaInfo the new replica info
+ */
+ private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+ if (prevReplicaInfo == null) {
+ activateMaster(newReplicaInfo);
+ } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
+ syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
+ } else {
+ syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
+ }
+ }
+
+ /**
+ * Synchronizes flows from the previous master, falling back to backups if the master fails.
+ *
+ * @param prevReplicaInfo the previous replica info
+ * @param newReplicaInfo the new replica info
+ */
+ private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+ syncFlowsOn(prevReplicaInfo.master())
+ .whenCompleteAsync((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
+ syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
+ } else {
+ activateMaster(newReplicaInfo);
+ }
+ }, executorService);
+ }
+
+ /**
+ * Synchronizes flows from the previous backups.
+ *
+ * @param prevReplicaInfo the previous replica info
+ * @param newReplicaInfo the new replica info
+ */
+ private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
+ List<NodeId> backups = prevReplicaInfo.backups()
+ .stream()
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .collect(Collectors.toList());
+ syncFlowsOn(backups)
+ .whenCompleteAsync((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
+ }
+ activateMaster(newReplicaInfo);
+ }, executorService);
+ }
+
+ /**
+ * Synchronizes flows for the device on the given nodes.
+ *
+ * @param nodes the nodes via which to synchronize the flows
+ * @return a future to be completed once flows have been synchronizes
+ */
+ private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
+ return nodes.isEmpty()
+ ? CompletableFuture.completedFuture(null)
+ : Tools.firstOf(nodes.stream()
+ .map(node -> syncFlowsOn(node))
+ .collect(Collectors.toList()))
+ .thenApply(v -> null);
+ }
+
+ /**
+ * Synchronizes flows for the device from the given node.
+ *
+ * @param nodeId the node from which to synchronize flows
+ * @return a future to be completed once the flows have been synchronizes
+ */
+ private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
+ return requestDigests(nodeId)
+ .thenCompose(digests -> Tools.allOf(digests.stream()
+ .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
+ .map(digest -> syncBucketOn(nodeId, digest.bucket()))
+ .collect(Collectors.toList())))
+ .thenApply(v -> null);
+ }
+
+ /**
+ * Synchronizes the given bucket on the given node.
+ *
+ * @param nodeId the node on which to synchronize the bucket
+ * @param bucketNumber the bucket to synchronize
+ * @return a future to be completed once the bucket has been synchronizes
+ */
+ private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
+ return requestBucket(nodeId, bucketNumber)
+ .thenAcceptAsync(flowBucket -> {
+ flowBuckets.compute(flowBucket.bucketId().bucket(),
+ (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
+ }, executorService);
+ }
+
+ /**
+ * Requests the given bucket from the given node.
+ *
+ * @param nodeId the node from which to request the bucket
+ * @param bucket the bucket to request
+ * @return a future to be completed with the bucket
+ */
+ private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
+ log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
+ return sendWithTimestamp(bucket, getBucketSubject, nodeId);
+ }
+
+ /**
+ * Handles a flow bucket request.
+ *
+ * @param bucket the bucket number
+ * @return the flow bucket
+ */
+ private FlowBucket onGetBucket(int bucket) {
+ return flowBuckets.get(bucket);
+ }
+
+ /**
+ * Activates the new master term.
+ *
+ * @param replicaInfo the new replica info
+ */
+ private void activateMaster(DeviceReplicaInfo replicaInfo) {
+ log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
+ for (int i = 0; i < NUM_BUCKETS; i++) {
+ activateBucket(i);
+ }
+ lifecycleManager.activate(replicaInfo.term());
+ activeTerm = replicaInfo.term();
+ }
+
+ /**
+ * Activates the given bucket number.
+ *
+ * @param bucket the bucket number to activate
+ */
+ private void activateBucket(int bucket) {
+ Queue<Runnable> tasks;
+ synchronized (flowTasks) {
+ tasks = flowTasks.remove(bucket);
+ }
+ if (tasks != null) {
+ log.debug("Completing enqueued operations for device {}", deviceId);
+ tasks.forEach(task -> task.run());
+ }
+ }
+
+ /**
+ * Handles a lifecycle event.
+ */
+ private void onLifecycleEvent(LifecycleEvent event) {
+ log.debug("Received lifecycle event for device {}: {}", deviceId, event);
+ switch (event.type()) {
+ case TERM_START:
+ startTerm(event.subject());
+ break;
+ case TERM_ACTIVE:
+ activateTerm(event.subject());
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Handles a replica change at the start of a new term.
+ */
+ private void startTerm(DeviceReplicaInfo replicaInfo) {
+ DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
+ this.replicaInfo = replicaInfo;
+ if (replicaInfo.isMaster(localNodeId)) {
+ log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
+ syncFlows(oldReplicaInfo, replicaInfo);
+ }
+ }
+
+ /**
+ * Handles the activation of a term.
+ */
+ private void activateTerm(DeviceReplicaInfo replicaInfo) {
+ if (replicaInfo.term() < this.replicaInfo.term()) {
+ return;
+ }
+ if (replicaInfo.term() > this.replicaInfo.term()) {
+ this.replicaInfo = replicaInfo;
+ }
+
+ // If the local node is neither the master or a backup for the device, clear the flow table.
+ if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
+ flowBuckets.values().forEach(bucket -> bucket.clear());
+ }
+ activeTerm = replicaInfo.term();
+ }
+
+ /**
+ * Sends a message to the given node wrapped in a Lamport timestamp.
+ * <p>
+ * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
+ * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
+ *
+ * @param message the message to send
+ * @param subject the message subject
+ * @param toNodeId the node to which to send the message
+ * @param <M> the message type
+ * @param <R> the response type
+ * @return a future to be completed with the response
+ */
+ private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
+ return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
+ clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
+ .thenApply(response -> {
+ clock.tick(response.timestamp());
+ return response.value();
+ });
+ }
+
+ /**
+ * Receives messages to the given subject wrapped in Lamport timestamps.
+ * <p>
+ * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
+ * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
+ *
+ * @param subject the subject for which to register the subscriber
+ * @param function the raw message handler
+ * @param <M> the raw message type
+ * @param <R> the raw response type
+ */
+ private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
+ clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
+ clock.tick(request.timestamp());
+ return clock.timestamp(function.apply(request.value()));
+ }, SERIALIZER::encode, executorService);
+ }
+
+ /**
+ * Registers internal message subscribers.
+ */
+ private void registerSubscribers() {
+ receiveWithTimestamp(getDigestsSubject, v -> getDigests());
+ receiveWithTimestamp(getBucketSubject, this::onGetBucket);
+ receiveWithTimestamp(backupSubject, this::onBackup);
+ }
+
+ /**
+ * Unregisters internal message subscribers.
+ */
+ private void unregisterSubscribers() {
+ clusterCommunicator.removeSubscriber(getDigestsSubject);
+ clusterCommunicator.removeSubscriber(getBucketSubject);
+ clusterCommunicator.removeSubscriber(backupSubject);
+ }
+
+ /**
+ * Adds internal event listeners.
+ */
+ private void addListeners() {
+ lifecycleManager.addListener(lifecycleEventListener);
+ }
+
+ /**
+ * Removes internal event listeners.
+ */
+ private void removeListeners() {
+ lifecycleManager.removeListener(lifecycleEventListener);
+ }
+
+ /**
+ * Cancels recurrent scheduled futures.
+ */
+ private synchronized void cancelFutures() {
+ ScheduledFuture<?> backupFuture = this.backupFuture;
+ if (backupFuture != null) {
+ backupFuture.cancel(false);
+ }
+
+ ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
+ if (antiEntropyFuture != null) {
+ antiEntropyFuture.cancel(false);
+ }
+ }
+
+ /**
+ * Closes the device flow table.
+ */
+ public void close() {
+ removeListeners();
+ unregisterSubscribers();
+ cancelFutures();
+ lifecycleManager.close();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java
new file mode 100644
index 0000000..30f9396
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceReplicaInfo.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.onosproject.cluster.NodeId;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Device term context.
+ */
+public class DeviceReplicaInfo {
+ private final long term;
+ private final NodeId master;
+ private final List<NodeId> backups;
+
+ public DeviceReplicaInfo(long term, NodeId master, List<NodeId> backups) {
+ this.term = term;
+ this.master = master;
+ this.backups = backups;
+ }
+
+ /**
+ * Returns the mastership term.
+ *
+ * @return the mastership term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
+ * Returns the master for the {@link #term()}.
+ *
+ * @return the master {@link NodeId} for the current {@link #term()}
+ */
+ public NodeId master() {
+ return master;
+ }
+
+ /**
+ * Returns a boolean indicating whether the given {@link NodeId} is the current master.
+ *
+ * @param nodeId the node ID to check
+ * @return indicates whether the given node identifier is the identifier of the current master
+ */
+ public boolean isMaster(NodeId nodeId) {
+ return Objects.equals(master, nodeId);
+ }
+
+ /**
+ * Returns a list of all active backup nodes in priority order.
+ * <p>
+ * The returned backups are limited by the flow rule store's configured backup count.
+ *
+ * @return a list of backup nodes in priority order
+ */
+ public List<NodeId> backups() {
+ return backups;
+ }
+
+ /**
+ * Returns a boolean indicating whether the given node is a backup.
+ *
+ * @param nodeId the node identifier
+ * @return indicates whether the given node is a backup
+ */
+ public boolean isBackup(NodeId nodeId) {
+ return backups.contains(nodeId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(term, master, backups);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof DeviceReplicaInfo) {
+ DeviceReplicaInfo that = (DeviceReplicaInfo) object;
+ return this.term == that.term
+ && Objects.equals(this.master, that.master)
+ && Objects.equals(this.backups, that.backups);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("term", term())
+ .add("master", master())
+ .add("backups", backups())
+ .toString();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 2919f08..19bfd48 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -1,44 +1,38 @@
- /*
- * Copyright 2014-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+/*
+* Copyright 2014-present Open Networking Foundation
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.onosproject.store.flow.impl;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
-import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -54,14 +48,16 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
+import org.onosproject.event.AbstractListenerManager;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowEntry.FlowEntryState;
-import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleEvent.Type;
@@ -80,14 +76,18 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
@@ -99,8 +99,8 @@
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.APPLY_BATCH_FLOWS;
-import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_ANTI_ENTROPY;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
@@ -113,8 +113,8 @@
@Component(immediate = true)
@Service
public class ECFlowRuleStore
- extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
- implements FlowRuleStore {
+ extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
+ implements FlowRuleStore {
private final Logger log = getLogger(getClass());
@@ -124,26 +124,25 @@
private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
private static final int DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS = 5000;
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
- private static final int NUM_BUCKETS = 1024;
@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
- label = "Number of threads in the message handler pool")
+ label = "Number of threads in the message handler pool")
private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
@Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
- label = "Delay in ms between successive backup runs")
+ label = "Delay in ms between successive backup runs")
private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
@Property(name = "antiEntropyPeriod", intValue = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS,
- label = "Delay in ms between anti-entropy runs")
+ label = "Delay in ms between anti-entropy runs")
private int antiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
@Property(name = "persistenceEnabled", boolValue = false,
- label = "Indicates whether or not changes in the flow table should be persisted to disk.")
+ label = "Indicates whether or not changes in the flow table should be persisted to disk.")
private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
@Property(name = "backupCount", intValue = DEFAULT_MAX_BACKUP_COUNT,
- label = "Max number of backup copies for each device")
+ label = "Max number of backup copies for each device")
private volatile int backupCount = DEFAULT_MAX_BACKUP_COUNT;
private InternalFlowTable flowTable = new InternalFlowTable();
@@ -176,14 +175,12 @@
private ExecutorService messageHandlingExecutor;
private ExecutorService eventHandler;
- private ScheduledFuture<?> backupTask;
- private ScheduledFuture<?> antiEntropyTask;
private final ScheduledExecutorService backupSenderExecutor =
- Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
+ Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
- new InternalTableStatsListener();
+ new InternalTableStatsListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@@ -199,7 +196,7 @@
.register(BucketId.class)
.register(MastershipBasedTimestamp.class);
- private EventuallyConsistentMap<BucketId, Integer> flowCounts;
+ protected AsyncConsistentMap<DeviceId, Long> mastershipTermLifecycles;
private IdGenerator idGenerator;
private NodeId local;
@@ -213,50 +210,37 @@
local = clusterService.getLocalNode().id();
eventHandler = Executors.newSingleThreadExecutor(
- groupedThreads("onos/flow", "event-handler", log));
+ groupedThreads("onos/flow", "event-handler", log));
messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
+ msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
registerMessageHandlers(messageHandlingExecutor);
- replicaInfoManager.addListener(flowTable);
- backupTask = backupSenderExecutor.scheduleWithFixedDelay(
- flowTable::backup,
- 0,
- backupPeriod,
- TimeUnit.MILLISECONDS);
- antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
- flowTable::runAntiEntropy,
- 0,
- antiEntropyPeriod,
- TimeUnit.MILLISECONDS);
-
- flowCounts = storageService.<BucketId, Integer>eventuallyConsistentMapBuilder()
- .withName("onos-flow-counts")
- .withSerializer(serializerBuilder)
- .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
- .withTimestampProvider((k, v) -> new WallClockTimestamp())
- .withTombstonesDisabled()
- .build();
+ mastershipTermLifecycles = storageService.<DeviceId, Long>consistentMapBuilder()
+ .withName("onos-flow-store-terms")
+ .withSerializer(serializer)
+ .buildAsyncMap();
deviceTableStats = storageService.<DeviceId, List<TableStatisticsEntry>>eventuallyConsistentMapBuilder()
- .withName("onos-flow-table-stats")
- .withSerializer(serializerBuilder)
- .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
- .withTimestampProvider((k, v) -> new WallClockTimestamp())
- .withTombstonesDisabled()
- .build();
+ .withName("onos-flow-table-stats")
+ .withSerializer(serializerBuilder)
+ .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
deviceTableStats.addListener(tableStatsListener);
+ deviceService.addListener(flowTable);
+ deviceService.getDevices().forEach(device -> flowTable.addDevice(device.id()));
+
logConfig("Started");
}
@Deactivate
public void deactivate(ComponentContext context) {
- replicaInfoManager.removeListener(flowTable);
- backupTask.cancel(true);
configService.unregisterProperties(getClass(), false);
unregisterMessageHandlers();
+ deviceService.removeListener(flowTable);
deviceTableStats.removeListener(tableStatsListener);
deviceTableStats.destroy();
eventHandler.shutdownNow();
@@ -297,48 +281,21 @@
newAntiEntropyPeriod = DEFAULT_ANTI_ENTROPY_PERIOD_MILLIS;
}
- boolean restartBackupTask = false;
- boolean restartAntiEntropyTask = false;
-
if (newBackupPeriod != backupPeriod) {
backupPeriod = newBackupPeriod;
- restartBackupTask = true;
+ flowTable.setBackupPeriod(newBackupPeriod);
}
if (newAntiEntropyPeriod != antiEntropyPeriod) {
antiEntropyPeriod = newAntiEntropyPeriod;
- restartAntiEntropyTask = true;
- }
-
- if (restartBackupTask) {
- if (backupTask != null) {
- // cancel previously running task
- backupTask.cancel(false);
- }
- backupTask = backupSenderExecutor.scheduleWithFixedDelay(
- flowTable::backup,
- 0,
- backupPeriod,
- TimeUnit.MILLISECONDS);
- }
-
- if (restartAntiEntropyTask) {
- if (antiEntropyTask != null) {
- // cancel previously running task
- antiEntropyTask.cancel(false);
- }
- antiEntropyTask = backupSenderExecutor.scheduleWithFixedDelay(
- flowTable::runAntiEntropy,
- 0,
- antiEntropyPeriod,
- TimeUnit.MILLISECONDS);
+ flowTable.setAntiEntropyPeriod(newAntiEntropyPeriod);
}
if (newPoolSize != msgHandlerPoolSize) {
msgHandlerPoolSize = newPoolSize;
ExecutorService oldMsgHandler = messageHandlingExecutor;
messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
+ msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
// replace previously registered handlers.
registerMessageHandlers(messageHandlingExecutor);
@@ -352,50 +309,63 @@
}
private void registerMessageHandlers(ExecutorService executor) {
-
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
- REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
+ REMOTE_APPLY_COMPLETED, serializer::decode, this::notifyDelegate, executor);
clusterCommunicator.addSubscriber(
- GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
+ GET_FLOW_ENTRY, serializer::decode, flowTable::getFlowEntry, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
+ GET_DEVICE_FLOW_ENTRIES, serializer::decode, flowTable::getFlowEntries, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+ GET_DEVICE_FLOW_COUNT, serializer::decode, flowTable::getFlowRuleCount, serializer::encode, executor);
clusterCommunicator.addSubscriber(
- FLOW_TABLE_BACKUP, serializer::decode, flowTable::onBackup, serializer::encode, executor);
- clusterCommunicator.addSubscriber(
- FLOW_TABLE_ANTI_ENTROPY, serializer::decode, flowTable::onAntiEntropy, serializer::encode, executor);
+ REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
}
private void unregisterMessageHandlers() {
clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
+ clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_COUNT);
clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
- clusterCommunicator.removeSubscriber(FLOW_TABLE_ANTI_ENTROPY);
}
private void logConfig(String prefix) {
log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}, backupCount = {}",
- prefix, msgHandlerPoolSize, backupPeriod, backupCount);
+ prefix, msgHandlerPoolSize, backupPeriod, backupCount);
}
@Override
public int getFlowRuleCount() {
return Streams.stream(deviceService.getDevices()).parallel()
- .mapToInt(device -> getFlowRuleCount(device.id()))
- .sum();
+ .mapToInt(device -> getFlowRuleCount(device.id()))
+ .sum();
}
@Override
public int getFlowRuleCount(DeviceId deviceId) {
- return flowCounts.entrySet().stream()
- .filter(entry -> entry.getKey().deviceId().equals(deviceId))
- .mapToInt(entry -> entry.getValue())
- .sum();
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.debug("Failed to getFlowRuleCount: No master for {}", deviceId);
+ return 0;
+ }
+
+ if (Objects.equals(local, master)) {
+ return flowTable.getFlowRuleCount(deviceId);
+ }
+
+ log.trace("Forwarding getFlowRuleCount to master {} for device {}", master, deviceId);
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ deviceId,
+ GET_DEVICE_FLOW_COUNT,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ 0);
}
@Override
@@ -412,16 +382,16 @@
}
log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
- master, rule.deviceId());
+ master, rule.deviceId());
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
- ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
- serializer::encode,
- serializer::decode,
- master),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- null);
+ ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
@Override
@@ -438,31 +408,31 @@
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- master, deviceId);
+ master, deviceId);
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
- ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
- serializer::encode,
- serializer::decode,
- master),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptyList());
+ ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptyList());
}
@Override
public void storeFlowRule(FlowRule rule) {
storeBatch(new FlowRuleBatchOperation(
- Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
- rule.deviceId(), idGenerator.getNewId()));
+ Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
+ rule.deviceId(), idGenerator.getNewId()));
}
@Override
public void storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
return;
}
@@ -472,11 +442,13 @@
if (master == null) {
log.warn("No master for {} ", deviceId);
- updateStoreInternal(operation);
-
+ Set<FlowRule> allFailures = operation.getOperations()
+ .stream()
+ .map(op -> op.target())
+ .collect(Collectors.toSet());
notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(false, allFailures, deviceId)));
return;
}
@@ -486,26 +458,26 @@
}
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
- master, deviceId);
+ master, deviceId);
clusterCommunicator.unicast(operation,
- APPLY_BATCH_FLOWS,
- serializer::encode,
- master)
- .whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to storeBatch: {} to {}", operation, master, error);
+ APPLY_BATCH_FLOWS,
+ serializer::encode,
+ master)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to storeBatch: {} to {}", operation, master, error);
- Set<FlowRule> allFailures = operation.getOperations()
- .stream()
- .map(op -> op.target())
- .collect(Collectors.toSet());
+ Set<FlowRule> allFailures = operation.getOperations()
+ .stream()
+ .map(op -> op.target())
+ .collect(Collectors.toSet());
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(false, allFailures, deviceId)));
- }
- });
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(false, allFailures, deviceId)));
+ }
+ });
}
private void storeBatchInternal(FlowRuleBatchOperation operation) {
@@ -515,65 +487,63 @@
Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
if (currentOps.isEmpty()) {
batchOperationComplete(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), did)));
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), did)));
return;
}
notifyDelegate(FlowRuleBatchEvent.requested(new
- FlowRuleBatchRequest(operation.id(),
- currentOps), operation.deviceId()));
+ FlowRuleBatchRequest(operation.id(),
+ currentOps), operation.deviceId()));
}
private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
return operation.getOperations().stream().map(
- op -> {
- StoredFlowEntry entry;
- switch (op.operator()) {
- case ADD:
- entry = new DefaultFlowEntry(op.target());
- flowTable.add(entry);
+ op -> {
+ StoredFlowEntry entry;
+ switch (op.operator()) {
+ case ADD:
+ entry = new DefaultFlowEntry(op.target());
+ flowTable.add(entry);
+ return op;
+ case MODIFY:
+ entry = new DefaultFlowEntry(op.target());
+ flowTable.update(entry);
+ return op;
+ case REMOVE:
+ return flowTable.update(op.target(), stored -> {
+ stored.setState(FlowEntryState.PENDING_REMOVE);
+ log.debug("Setting state of rule to pending remove: {}", stored);
return op;
- case MODIFY:
- entry = new DefaultFlowEntry(op.target());
- flowTable.update(entry);
- return op;
- case REMOVE:
- entry = flowTable.getFlowEntry(op.target());
- if (entry != null) {
- entry.setState(FlowEntryState.PENDING_REMOVE);
- flowTable.update(entry);
- log.debug("Setting state of rule to pending remove: {}", entry);
- return op;
- }
- break;
- default:
- log.warn("Unknown flow operation operator: {}", op.operator());
- }
- return null;
+ });
+ default:
+ log.warn("Unknown flow operation operator: {}", op.operator());
}
+ return null;
+ }
).filter(Objects::nonNull).collect(Collectors.toSet());
}
@Override
public void deleteFlowRule(FlowRule rule) {
storeBatch(
- new FlowRuleBatchOperation(
- Collections.singletonList(
- new FlowRuleBatchEntry(
- FlowRuleOperation.REMOVE,
- rule)), rule.deviceId(), idGenerator.getNewId()));
+ new FlowRuleBatchOperation(
+ Collections.singletonList(
+ new FlowRuleBatchEntry(
+ FlowRuleOperation.REMOVE,
+ rule)), rule.deviceId(), idGenerator.getNewId()));
}
@Override
public FlowRuleEvent pendingFlowRule(FlowEntry rule) {
if (mastershipService.isLocalMaster(rule.deviceId())) {
- StoredFlowEntry stored = flowTable.getFlowEntry(rule);
- if (stored != null &&
- stored.state() != FlowEntryState.PENDING_ADD) {
- stored.setState(FlowEntryState.PENDING_ADD);
- return new FlowRuleEvent(Type.RULE_UPDATED, rule);
- }
+ return flowTable.update(rule, stored -> {
+ if (stored.state() == FlowEntryState.PENDING_ADD) {
+ stored.setState(FlowEntryState.PENDING_ADD);
+ return new FlowRuleEvent(Type.RULE_UPDATED, rule);
+ }
+ return null;
+ });
}
return null;
}
@@ -586,14 +556,12 @@
}
log.warn("Tried to update FlowRule {} state,"
- + " while the Node was not the master.", rule);
+ + " while the Node was not the master.", rule);
return null;
}
private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
- // check if this new rule is an update to an existing entry
- StoredFlowEntry stored = flowTable.getFlowEntry(rule);
- if (stored != null) {
+ FlowRuleEvent event = flowTable.update(rule, stored -> {
stored.setBytes(rule.bytes());
stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
stored.setLiveType(rule.liveType());
@@ -601,11 +569,12 @@
stored.setLastSeen();
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
- // Update the flow table to ensure the changes are replicated
- flowTable.update(stored);
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
+ });
+ if (event != null) {
+ return event;
}
// TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
@@ -631,14 +600,17 @@
}
log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
- master, deviceId);
+ master, deviceId);
- return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
- rule,
- REMOVE_FLOW_ENTRY,
- serializer::encode,
- serializer::decode,
- master));
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ rule,
+ REMOVE_FLOW_ENTRY,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
@@ -699,675 +671,173 @@
}
}
- /**
- * Represents a backup to a of a distinct bucket to a distinct node.
- */
- private class BackupOperation {
- private final NodeId nodeId;
- private final BucketId bucketId;
-
- BackupOperation(NodeId nodeId, BucketId bucketId) {
- this.nodeId = nodeId;
- this.bucketId = bucketId;
- }
-
- NodeId nodeId() {
- return nodeId;
- }
-
- BucketId bucketId() {
- return bucketId;
- }
+ private class InternalFlowTable implements DeviceListener {
+ private final Map<DeviceId, DeviceFlowTable> flowTables = Maps.newConcurrentMap();
@Override
- public int hashCode() {
- return Objects.hash(nodeId, bucketId);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof BackupOperation) {
- BackupOperation that = (BackupOperation) other;
- return this.nodeId.equals(that.nodeId)
- && this.bucketId.equals(that.bucketId);
- }
- return false;
- }
- }
-
- /**
- * Represents a distinct device flow bucket.
- */
- private class BucketId {
- private final DeviceId deviceId;
- private final int bucket;
-
- BucketId(DeviceId deviceId, int bucket) {
- this.deviceId = deviceId;
- this.bucket = bucket;
- }
-
- DeviceId deviceId() {
- return deviceId;
- }
-
- int bucket() {
- return bucket;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(deviceId, bucket);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof BucketId) {
- BucketId that = (BucketId) other;
- return this.deviceId.equals(that.deviceId)
- && this.bucket == that.bucket;
- }
- return false;
- }
- }
-
- /**
- * Container for flows in a specific bucket.
- */
- private class FlowBucket {
- private final BucketId bucketId;
- private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table;
- private final long timestamp;
-
- BucketId bucketId() {
- return bucketId;
- }
-
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table() {
- return table;
- }
-
- long timestamp() {
- return timestamp;
- }
-
- FlowBucket(BucketId bucketId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> table, long timestamp) {
- this.bucketId = bucketId;
- this.table = table;
- this.timestamp = timestamp;
- }
- }
-
- /**
- * Device digest.
- */
- private class DeviceDigest {
- private final DeviceId deviceId;
- private final Set<FlowBucketDigest> digests;
-
- DeviceDigest(DeviceId deviceId, Set<FlowBucketDigest> digests) {
- this.deviceId = deviceId;
- this.digests = digests;
- }
-
- DeviceId deviceId() {
- return deviceId;
- }
-
- Set<FlowBucketDigest> digests() {
- return digests;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(deviceId, digests);
- }
-
- @Override
- public boolean equals(Object object) {
- return object instanceof DeviceDigest
- && ((DeviceDigest) object).deviceId.equals(deviceId);
- }
- }
-
- /**
- * Flow bucket digest.
- */
- private class FlowBucketDigest {
- private final BucketId bucketId;
- private final long timestamp;
-
- FlowBucketDigest(BucketId bucketId, long timestamp) {
- this.bucketId = bucketId;
- this.timestamp = timestamp;
- }
-
- BucketId bucketId() {
- return bucketId;
- }
-
- long timestamp() {
- return timestamp;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(bucketId);
- }
-
- @Override
- public boolean equals(Object object) {
- return object instanceof FlowBucketDigest
- && ((FlowBucketDigest) object).bucketId.equals(bucketId);
- }
- }
-
- private class InternalFlowTable implements ReplicaInfoEventListener {
-
- //TODO replace the Map<V,V> with ExtendedSet
- private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
- flowEntries = Maps.newConcurrentMap();
-
- private final Map<BackupOperation, Long> lastBackupTimes = Maps.newConcurrentMap();
- private final Map<BucketId, Long> lastUpdateTimes = Maps.newConcurrentMap();
- private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
-
- private final AtomicLong currentTimestamp = new AtomicLong();
-
- @Override
- public void event(ReplicaInfoEvent event) {
- eventHandler.execute(() -> handleEvent(event));
- }
-
- /**
- * Handles a replica change event.
- *
- * @param event the replica change event to handle
- */
- private void handleEvent(ReplicaInfoEvent event) {
- DeviceId deviceId = event.subject();
-
- // If the local node is not the master, return.
- if (!isMasterNode(deviceId)) {
- // If the local node is neither the master or a backup, remove flow tables for the device.
- if (!isBackupNode(deviceId)) {
- purgeFlowRule(deviceId);
- }
- return;
- }
- backupSenderExecutor.execute(this::runAntiEntropy);
- }
-
- /**
- * Returns the set of devices in the flow table.
- *
- * @return the set of devices in the flow table
- */
- private Set<DeviceId> getDevices() {
- return flowEntries.keySet();
- }
-
- /**
- * Returns the digests for all buckets in the flow table for the given device.
- *
- * @param deviceId the device for which to return digests
- * @return the set of digests for all buckets for the given device
- */
- private Set<FlowBucketDigest> getDigests(DeviceId deviceId) {
- return IntStream.range(0, NUM_BUCKETS)
- .mapToObj(bucket -> {
- BucketId bucketId = new BucketId(deviceId, bucket);
- long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
- return new FlowBucketDigest(bucketId, timestamp);
- }).collect(Collectors.toSet());
- }
-
- /**
- * Returns the flow table for specified device.
- *
- * @param deviceId identifier of the device
- * @return Map representing Flow Table of given device.
- */
- private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
- // Use an external get/null check to avoid locks.
- // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8161372
- if (persistenceEnabled) {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = flowEntries.get(deviceId);
- return flowTable != null ? flowTable
- : flowEntries.computeIfAbsent(deviceId, id ->
- persistenceService.<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder()
- .withName("FlowTable:" + deviceId.toString())
- .withSerializer(serializer)
- .build());
- } else {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = flowEntries.get(deviceId);
- return flowTable != null ? flowTable
- : flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
+ public void event(DeviceEvent event) {
+ if (event.type() == DeviceEvent.Type.DEVICE_ADDED) {
+ addDevice(event.subject().id());
}
}
- private FlowBucket getFlowBucket(BucketId bucketId) {
- long timestamp = lastUpdateTimes.getOrDefault(bucketId, 0L);
- return new FlowBucket(bucketId, getFlowTable(bucketId.deviceId())
- .entrySet()
- .stream()
- .filter(entry -> isInBucket(entry.getKey(), bucketId.bucket()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
- timestamp);
+ /**
+ * Adds the given device to the flow table.
+ *
+ * @param deviceId the device to add to the table
+ */
+ public void addDevice(DeviceId deviceId) {
+ flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
+ id,
+ clusterService,
+ clusterCommunicator,
+ new InternalLifecycleManager(id),
+ backupSenderExecutor,
+ backupPeriod,
+ antiEntropyPeriod));
}
- private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
- // Use an external get/null check to avoid locks.
- // https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8161372
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(deviceId);
- Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowTable.get(flowId);
- return flowEntries != null ? flowEntries : flowTable.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
+ /**
+ * Sets the flow table backup period.
+ *
+ * @param backupPeriod the flow table backup period
+ */
+ void setBackupPeriod(int backupPeriod) {
+ flowTables.values().forEach(flowTable -> flowTable.setBackupPeriod(backupPeriod));
}
- private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
- return getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule);
+ /**
+ * Sets the flow table anti-entropy period.
+ *
+ * @param antiEntropyPeriod the flow table anti-entropy period
+ */
+ void setAntiEntropyPeriod(int antiEntropyPeriod) {
+ flowTables.values().forEach(flowTable -> flowTable.setAntiEntropyPeriod(antiEntropyPeriod));
}
- private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
- return getFlowTable(deviceId).values().stream()
- .flatMap(m -> m.values().stream())
- .collect(Collectors.toSet());
+ /**
+ * Returns the flow table for a specific device.
+ *
+ * @param deviceId the device identifier
+ * @return the flow table for the given device
+ */
+ private DeviceFlowTable getFlowTable(DeviceId deviceId) {
+ DeviceFlowTable flowTable = flowTables.get(deviceId);
+ return flowTable != null ? flowTable : flowTables.computeIfAbsent(deviceId, id -> new DeviceFlowTable(
+ deviceId,
+ clusterService,
+ clusterCommunicator,
+ new InternalLifecycleManager(deviceId),
+ backupSenderExecutor,
+ backupPeriod,
+ antiEntropyPeriod));
}
+ /**
+ * Returns the flow rule count for the given device.
+ *
+ * @param deviceId the device for which to return the flow rule count
+ * @return the flow rule count for the given device
+ */
+ public int getFlowRuleCount(DeviceId deviceId) {
+ return getFlowTable(deviceId).count();
+ }
+
+ /**
+ * Returns the flow entry for the given rule.
+ *
+ * @param rule the rule for which to return the flow entry
+ * @return the flow entry for the given rule
+ */
public StoredFlowEntry getFlowEntry(FlowRule rule) {
- return getFlowEntryInternal(rule);
+ return getFlowTable(rule.deviceId()).getFlowEntry(rule);
}
+ /**
+ * Returns the set of flow entries for the given device.
+ *
+ * @param deviceId the device for which to lookup flow entries
+ * @return the set of flow entries for the given device
+ */
public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
- return getFlowEntriesInternal(deviceId);
+ return getFlowTable(deviceId).getFlowEntries();
}
- private boolean isInBucket(FlowId flowId, int bucket) {
- return bucket(flowId) == bucket;
- }
-
- private int bucket(FlowId flowId) {
- return (int) (flowId.id() % NUM_BUCKETS);
- }
-
- private void recordUpdate(BucketId bucketId) {
- recordUpdate(bucketId, currentTimestamp.accumulateAndGet(System.currentTimeMillis(), Math::max));
- }
-
- private void recordUpdate(BucketId bucketId, long timestamp) {
- lastUpdateTimes.put(bucketId, timestamp);
- }
-
+ /**
+ * Adds the given flow rule.
+ *
+ * @param rule the rule to add
+ */
public void add(FlowEntry rule) {
- getFlowEntriesInternal(rule.deviceId(), rule.id())
- .compute((StoredFlowEntry) rule, (k, stored) -> {
- return (StoredFlowEntry) rule;
- });
- recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
+ Tools.futureGetOrElse(
+ getFlowTable(rule.deviceId()).add(rule),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
+ /**
+ * Updates the given flow rule.
+ *
+ * @param rule the rule to update
+ */
public void update(FlowEntry rule) {
- getFlowEntriesInternal(rule.deviceId(), rule.id())
- .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
- if (rule instanceof DefaultFlowEntry) {
- DefaultFlowEntry updated = (DefaultFlowEntry) rule;
- if (stored instanceof DefaultFlowEntry) {
- DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
- if (updated.created() >= storedEntry.created()) {
- recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
- return updated;
- } else {
- log.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
- return stored;
- }
- }
- }
- return stored;
- });
+ Tools.futureGetOrElse(
+ getFlowTable(rule.deviceId()).update(rule),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
+ /**
+ * Applies the given update function to the rule.
+ *
+ * @param function the update function to apply
+ * @return a future to be completed with the update event or {@code null} if the rule was not updated
+ */
+ public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function) {
+ return Tools.futureGetOrElse(
+ getFlowTable(rule.deviceId()).update(rule, function),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
+ }
+
+ /**
+ * Removes the given flow rule.
+ *
+ * @param rule the rule to remove
+ */
public FlowEntry remove(FlowEntry rule) {
- final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
- final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowTable = getFlowTable(rule.deviceId());
- flowTable.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
- flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
- if (rule instanceof DefaultFlowEntry) {
- DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
- if (stored instanceof DefaultFlowEntry) {
- DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
- if (toRemove.created() < storedEntry.created()) {
- log.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
- // the key is not updated, removedRule remains null
- return stored;
- }
- }
- }
- removedRule.set(stored);
- return null;
- });
- return flowEntries.isEmpty() ? null : flowEntries;
- });
-
- if (removedRule.get() != null) {
- recordUpdate(new BucketId(rule.deviceId(), bucket(rule.id())));
- return removedRule.get();
- } else {
- return null;
- }
+ return Tools.futureGetOrElse(
+ getFlowTable(rule.deviceId()).remove(rule),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ null);
}
+ /**
+ * Purges flow rules for the given device.
+ *
+ * @param deviceId the device for which to purge flow rules
+ */
public void purgeFlowRule(DeviceId deviceId) {
- flowEntries.remove(deviceId);
+ DeviceFlowTable flowTable = flowTables.remove(deviceId);
+ if (flowTable != null) {
+ flowTable.close();
+ }
}
+ /**
+ * Purges all flow rules from the table.
+ */
public void purgeFlowRules() {
- flowEntries.clear();
- }
-
- /**
- * Returns a boolean indicating whether the local node is the current master for the given device.
- *
- * @param deviceId the device for which to indicate whether the local node is the current master
- * @return indicates whether the local node is the current master for the given device
- */
- private boolean isMasterNode(DeviceId deviceId) {
- NodeId master = replicaInfoManager.getReplicaInfoFor(deviceId).master().orElse(null);
- return Objects.equals(master, clusterService.getLocalNode().id());
- }
-
- /**
- * Returns a boolean indicating whether the local node is a backup for the given device.
- *
- * @param deviceId the device for which to indicate whether the local node is a backup
- * @return indicates whether the local node is a backup for the given device
- */
- private boolean isBackupNode(DeviceId deviceId) {
- List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
- int index = backupNodes.indexOf(local);
- return index != -1 && index < backupCount;
- }
-
- /**
- * Backs up all devices to all backup nodes.
- */
- private void backup() {
- for (DeviceId deviceId : getDevices()) {
- backup(deviceId);
+ Iterator<DeviceFlowTable> iterator = flowTables.values().iterator();
+ while (iterator.hasNext()) {
+ iterator.next().close();
+ iterator.remove();
}
}
-
- /**
- * Backs up all buckets in the given device to the given node.
- *
- * @param deviceId the device to back up
- */
- private void backup(DeviceId deviceId) {
- if (!isMasterNode(deviceId)) {
- return;
- }
-
- // Get a list of backup nodes for the device.
- List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
- int availableBackupCount = Math.min(backupCount, backupNodes.size());
-
- // If the list of backup nodes is empty, update the flow count.
- if (availableBackupCount == 0) {
- updateDeviceFlowCounts(deviceId);
- } else {
- // Otherwise, iterate through backup nodes and backup the device.
- for (int index = 0; index < availableBackupCount; index++) {
- NodeId backupNode = backupNodes.get(index);
- try {
- backup(deviceId, backupNode);
- } catch (Exception e) {
- log.error("Backup of " + deviceId + " to " + backupNode + " failed", e);
- }
- }
- }
- }
-
- /**
- * Backs up all buckets for the given device to the given node.
- *
- * @param deviceId the device to back up
- * @param nodeId the node to which to back up the device
- */
- private void backup(DeviceId deviceId, NodeId nodeId) {
- final long timestamp = System.currentTimeMillis();
- for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
- BucketId bucketId = new BucketId(deviceId, bucket);
- BackupOperation operation = new BackupOperation(nodeId, bucketId);
- if (startBackup(operation)) {
- backup(operation).whenCompleteAsync((succeeded, error) -> {
- if (error == null && succeeded) {
- succeedBackup(operation, timestamp);
- } else {
- failBackup(operation);
- }
- backup(deviceId, nodeId);
- }, backupSenderExecutor);
- }
- }
- }
-
- /**
- * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
- * <p>
- * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
- * are pending replication for the backup operation.
- *
- * @param operation the operation to start
- * @return indicates whether the given backup operation should be started
- */
- private boolean startBackup(BackupOperation operation) {
- long lastBackupTime = lastBackupTimes.getOrDefault(operation, 0L);
- long lastUpdateTime = lastUpdateTimes.getOrDefault(operation.bucketId(), 0L);
- return lastUpdateTime > 0 && lastBackupTime <= lastUpdateTime && inFlightUpdates.add(operation);
- }
-
- /**
- * Fails the given backup operation.
- *
- * @param operation the backup operation to fail
- */
- private void failBackup(BackupOperation operation) {
- inFlightUpdates.remove(operation);
- }
-
- /**
- * Succeeds the given backup operation.
- * <p>
- * The last backup time for the operation will be updated and the operation will be removed from
- * in-flight updates.
- *
- * @param operation the operation to succeed
- * @param timestamp the timestamp at which the operation was <em>started</em>
- */
- private void succeedBackup(BackupOperation operation, long timestamp) {
- lastBackupTimes.put(operation, timestamp);
- inFlightUpdates.remove(operation);
- }
-
- /**
- * Performs the given backup operation.
- *
- * @param operation the operation to perform
- * @return a future to be completed with a boolean indicating whether the backup operation was successful
- */
- private CompletableFuture<Boolean> backup(BackupOperation operation) {
- log.debug("Sending flowEntries in bucket {} for device {} to {} for backup.",
- operation.bucketId().bucket(), operation.bucketId().deviceId(), operation.nodeId());
- FlowBucket flowBucket = getFlowBucket(operation.bucketId());
-
- CompletableFuture<Boolean> future = new CompletableFuture<>();
- clusterCommunicator.<FlowBucket, Set<FlowId>>sendAndReceive(
- flowBucket,
- FLOW_TABLE_BACKUP,
- serializer::encode,
- serializer::decode,
- operation.nodeId())
- .whenComplete((backedupFlows, error) -> {
- Set<FlowId> flowsNotBackedUp = error != null ?
- flowBucket.table().keySet() :
- Sets.difference(flowBucket.table().keySet(), backedupFlows);
- if (flowsNotBackedUp.size() > 0) {
- log.warn("Failed to backup flows: {}. Reason: {}, Node: {}",
- flowsNotBackedUp, error != null ? error.getMessage() : "none", operation.nodeId());
- }
- future.complete(backedupFlows != null);
- });
-
- updateFlowCounts(flowBucket);
- return future;
- }
-
- /**
- * Handles a flow bucket backup from a remote peer.
- *
- * @param flowBucket the flow bucket to back up
- * @return the set of flows that could not be backed up
- */
- private Set<FlowId> onBackup(FlowBucket flowBucket) {
- log.debug("Received flowEntries for {} bucket {} to backup",
- flowBucket.bucketId().deviceId(), flowBucket.bucketId);
- Set<FlowId> backedupFlows = Sets.newHashSet();
- try {
- // Only process those devices are that not managed by the local node.
- NodeId master = replicaInfoManager.getReplicaInfoFor(flowBucket.bucketId().deviceId())
- .master()
- .orElse(null);
- if (!Objects.equals(local, master)) {
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable =
- getFlowTable(flowBucket.bucketId().deviceId());
- backupFlowTable.putAll(flowBucket.table());
- backupFlowTable.entrySet()
- .removeIf(entry -> isInBucket(entry.getKey(), flowBucket.bucketId().bucket())
- && !flowBucket.table().containsKey(entry.getKey()));
- backedupFlows.addAll(flowBucket.table().keySet());
- recordUpdate(flowBucket.bucketId(), flowBucket.timestamp());
- }
- } catch (Exception e) {
- log.warn("Failure processing backup request", e);
- }
- return backedupFlows;
- }
-
- /**
- * Runs the anti-entropy protocol.
- */
- private void runAntiEntropy() {
- for (DeviceId deviceId : getDevices()) {
- runAntiEntropy(deviceId);
- }
- }
-
- /**
- * Runs the anti-entropy protocol for the given device.
- *
- * @param deviceId the device for which to run the anti-entropy protocol
- */
- private void runAntiEntropy(DeviceId deviceId) {
- if (!isMasterNode(deviceId)) {
- return;
- }
-
- // Get the set of digests for the node.
- Set<FlowBucketDigest> digests = getDigests(deviceId);
-
- // Get a list of backup nodes for the device and compute the real backup count.
- List<NodeId> backupNodes = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
- int availableBackupCount = Math.min(backupCount, backupNodes.size());
-
- // Iterate through backup nodes and run the anti-entropy protocol.
- for (int index = 0; index < availableBackupCount; index++) {
- NodeId backupNode = backupNodes.get(index);
- try {
- runAntiEntropy(deviceId, backupNode, digests);
- } catch (Exception e) {
- log.error("Anti-entropy for " + deviceId + " to " + backupNode + " failed", e);
- }
- }
- }
-
- /**
- * Sends an anti-entropy advertisement to the given node.
- *
- * @param deviceId the device ID for which to send the advertisement
- * @param nodeId the node to which to send the advertisement
- * @param digests the digests to send to the given node
- */
- private void runAntiEntropy(DeviceId deviceId, NodeId nodeId, Set<FlowBucketDigest> digests) {
- log.trace("Sending anti-entropy advertisement for device {} to {}", deviceId, nodeId);
- clusterCommunicator.<Set<FlowBucketDigest>, Set<BucketId>>sendAndReceive(
- digests,
- FLOW_TABLE_ANTI_ENTROPY,
- serializer::encode,
- serializer::decode,
- nodeId)
- .whenComplete((missingBuckets, error) -> {
- if (error == null) {
- log.debug("Detected {} missing buckets on node {} for device {}",
- missingBuckets.size(), nodeId, deviceId);
- } else {
- log.trace("Anti-entropy advertisement for device {} to {} failed", deviceId, nodeId, error);
- }
- });
- }
-
- /**
- * Handles a device anti-entropy request from a remote peer.
- *
- * @param digest the device digest
- * @return the set of flow buckets to update
- */
- private Set<BucketId> onAntiEntropy(DeviceDigest digest) {
- // If the local node is the master, reject the anti-entropy request.
- // TODO: We really should be using mastership terms in anti-entropy requests to determine whether
- // this node is a newer master, but that would only reduce the time it takes to resolve missing flows
- // as a later anti-entropy request will still succeed once this node recognizes it's no longer the master.
- NodeId master = replicaInfoManager.getReplicaInfoFor(digest.deviceId())
- .master()
- .orElse(null);
- if (Objects.equals(master, local)) {
- return ImmutableSet.of();
- }
-
- // Compute a set of missing BucketIds based on digest times and send them back to the master.
- Set<BucketId> missingBuckets = new HashSet<>();
- for (FlowBucketDigest flowBucketDigest : digest.digests()) {
- long lastUpdated = lastUpdateTimes.getOrDefault(flowBucketDigest.bucketId(), 0L);
- if (lastUpdated < flowBucketDigest.timestamp()) {
- missingBuckets.add(flowBucketDigest.bucketId());
- }
- }
- return missingBuckets;
- }
-
- /**
- * Updates all flow counts for the given device.
- *
- * @param deviceId the device for which to update flow counts
- */
- private void updateDeviceFlowCounts(DeviceId deviceId) {
- for (int bucket = 0; bucket < NUM_BUCKETS; bucket++) {
- BucketId bucketId = new BucketId(deviceId, bucket);
- FlowBucket flowBucket = getFlowBucket(bucketId);
- updateFlowCounts(flowBucket);
- }
- }
-
- /**
- * Updates the eventually consistent flow count for the given bucket.
- *
- * @param flowBucket the flow bucket for which to update flow counts
- */
- private void updateFlowCounts(FlowBucket flowBucket) {
- int flowCount = flowBucket.table().entrySet()
- .stream()
- .mapToInt(e -> e.getValue().values().size())
- .sum();
- flowCounts.put(flowBucket.bucketId(), flowCount);
- }
}
@Override
@@ -1395,16 +865,114 @@
@Override
public long getActiveFlowRuleCount(DeviceId deviceId) {
return Streams.stream(getTableStatistics(deviceId))
- .mapToLong(TableStatisticsEntry::activeFlowEntries)
- .sum();
+ .mapToLong(TableStatisticsEntry::activeFlowEntries)
+ .sum();
}
private class InternalTableStatsListener
implements EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> {
@Override
public void event(EventuallyConsistentMapEvent<DeviceId,
- List<TableStatisticsEntry>> event) {
+ List<TableStatisticsEntry>> event) {
//TODO: Generate an event to listeners (do we need?)
}
}
+
+ /**
+ * Device lifecycle manager implementation.
+ */
+ private final class InternalLifecycleManager
+ extends AbstractListenerManager<LifecycleEvent, LifecycleEventListener>
+ implements LifecycleManager, ReplicaInfoEventListener, MapEventListener<DeviceId, Long> {
+
+ private final DeviceId deviceId;
+
+ private volatile DeviceReplicaInfo replicaInfo;
+
+ InternalLifecycleManager(DeviceId deviceId) {
+ this.deviceId = deviceId;
+ replicaInfoManager.addListener(this);
+ mastershipTermLifecycles.addListener(this);
+ replicaInfo = toDeviceReplicaInfo(replicaInfoManager.getReplicaInfoFor(deviceId));
+ }
+
+ @Override
+ public DeviceReplicaInfo getReplicaInfo() {
+ return replicaInfo;
+ }
+
+ @Override
+ public void activate(long term) {
+ final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (replicaInfo != null && replicaInfo.term() == term) {
+ mastershipTermLifecycles.put(deviceId, term);
+ }
+ }
+
+ @Override
+ public void event(ReplicaInfoEvent event) {
+ if (event.subject().equals(deviceId) && event.type() == ReplicaInfoEvent.Type.MASTER_CHANGED) {
+ onReplicaInfoChange(event.replicaInfo());
+ }
+ }
+
+ @Override
+ public void event(MapEvent<DeviceId, Long> event) {
+ if (event.key().equals(deviceId) && event.newValue() != null) {
+ onActivate(event.newValue().value());
+ }
+ }
+
+ /**
+ * Handles a term activation event.
+ *
+ * @param term the term that was activated
+ */
+ private void onActivate(long term) {
+ final ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
+ if (replicaInfo != null && replicaInfo.term() == term) {
+ NodeId master = replicaInfo.master().orElse(null);
+ List<NodeId> backups = replicaInfo.backups()
+ .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+ listenerRegistry.process(new LifecycleEvent(
+ LifecycleEvent.Type.TERM_ACTIVE,
+ new DeviceReplicaInfo(term, master, backups)));
+ }
+ }
+
+ /**
+ * Handles a replica info change event.
+ *
+ * @param replicaInfo the updated replica info
+ */
+ private synchronized void onReplicaInfoChange(ReplicaInfo replicaInfo) {
+ DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
+ this.replicaInfo = toDeviceReplicaInfo(replicaInfo);
+ if (oldReplicaInfo == null || oldReplicaInfo.term() < replicaInfo.term()) {
+ if (oldReplicaInfo != null) {
+ listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_END, oldReplicaInfo));
+ }
+ listenerRegistry.process(new LifecycleEvent(LifecycleEvent.Type.TERM_START, this.replicaInfo));
+ }
+ }
+
+ /**
+ * Converts the given replica info into a {@link DeviceReplicaInfo} instance.
+ *
+ * @param replicaInfo the replica info to convert
+ * @return the converted replica info
+ */
+ private DeviceReplicaInfo toDeviceReplicaInfo(ReplicaInfo replicaInfo) {
+ NodeId master = replicaInfo.master().orElse(null);
+ List<NodeId> backups = replicaInfo.backups()
+ .subList(0, Math.min(replicaInfo.backups().size(), backupCount));
+ return new DeviceReplicaInfo(replicaInfo.term(), master, backups);
+ }
+
+ @Override
+ public void close() {
+ replicaInfoManager.removeListener(this);
+ mastershipTermLifecycles.removeListener(this);
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index 254f5df..c57e9eb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -32,6 +32,9 @@
public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
= new MessageSubject("peer-forward-get-device-flow-entries");
+ public static final MessageSubject GET_DEVICE_FLOW_COUNT
+ = new MessageSubject("peer-forward-get-flow-count");
+
public static final MessageSubject REMOVE_FLOW_ENTRY
= new MessageSubject("peer-forward-remove-flow-entry");
@@ -40,7 +43,4 @@
public static final MessageSubject FLOW_TABLE_BACKUP
= new MessageSubject("peer-flow-table-backup");
-
- public static final MessageSubject FLOW_TABLE_ANTI_ENTROPY
- = new MessageSubject("peer-flow-table-anti-entropy");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
new file mode 100644
index 0000000..2205b0b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import com.google.common.collect.Maps;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.store.LogicalTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Container for a bucket of flows assigned to a specific device.
+ * <p>
+ * The bucket is mutable. When changes are made to the bucket, the term and timestamp in which the change
+ * occurred is recorded for ordering changes.
+ */
+public class FlowBucket {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlowBucket.class);
+ private final BucketId bucketId;
+ private volatile long term;
+ private volatile LogicalTimestamp timestamp = new LogicalTimestamp(0);
+ private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket = Maps.newConcurrentMap();
+
+ FlowBucket(BucketId bucketId) {
+ this.bucketId = bucketId;
+ }
+
+ /**
+ * Returns the flow bucket identifier.
+ *
+ * @return the flow bucket identifier
+ */
+ public BucketId bucketId() {
+ return bucketId;
+ }
+
+ /**
+ * Returns the flow bucket term.
+ *
+ * @return the flow bucket term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
+ * Returns the flow bucket timestamp.
+ *
+ * @return the flow bucket timestamp
+ */
+ public LogicalTimestamp timestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Returns the digest for the bucket.
+ *
+ * @return the digest for the bucket
+ */
+ public FlowBucketDigest getDigest() {
+ return new FlowBucketDigest(bucketId().bucket(), term(), timestamp());
+ }
+
+ /**
+ * Returns the flow entries in the bucket.
+ *
+ * @return the flow entries in the bucket
+ */
+ public Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowBucket() {
+ return flowBucket;
+ }
+
+ /**
+ * Returns the flow entries for the given flow.
+ *
+ * @param flowId the flow identifier
+ * @return the flows for the given flow ID
+ */
+ public Map<StoredFlowEntry, StoredFlowEntry> getFlowEntries(FlowId flowId) {
+ Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(flowId);
+ return flowEntries != null ? flowEntries : flowBucket.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
+ }
+
+ /**
+ * Counts the flows in the bucket.
+ *
+ * @return the number of flows in the bucket
+ */
+ public int count() {
+ return flowBucket.values()
+ .stream()
+ .mapToInt(entry -> entry.values().size())
+ .sum();
+ }
+
+ /**
+ * Records an update to the bucket.
+ */
+ private void recordUpdate(long term, LogicalTimestamp timestamp) {
+ this.term = term;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Adds the given flow rule to the bucket.
+ *
+ * @param rule the rule to add
+ * @param term the term in which the change occurred
+ * @param clock the logical clock
+ */
+ public void add(FlowEntry rule, long term, LogicalClock clock) {
+ Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+ if (flowEntries == null) {
+ flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+ }
+ flowEntries.put((StoredFlowEntry) rule, (StoredFlowEntry) rule);
+ recordUpdate(term, clock.getTimestamp());
+ }
+
+ /**
+ * Updates the given flow rule in the bucket.
+ *
+ * @param rule the rule to update
+ * @param term the term in which the change occurred
+ * @param clock the logical clock
+ */
+ public void update(FlowEntry rule, long term, LogicalClock clock) {
+ Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+ if (flowEntries == null) {
+ flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+ }
+ flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+ if (rule instanceof DefaultFlowEntry) {
+ DefaultFlowEntry updated = (DefaultFlowEntry) rule;
+ if (stored instanceof DefaultFlowEntry) {
+ DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+ if (updated.created() >= storedEntry.created()) {
+ recordUpdate(term, clock.getTimestamp());
+ return updated;
+ } else {
+ LOGGER.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
+ return stored;
+ }
+ }
+ }
+ return stored;
+ });
+ }
+
+ /**
+ * Applies the given update function to the rule.
+ *
+ * @param rule the rule to update
+ * @param function the update function to apply
+ * @param term the term in which the change occurred
+ * @param clock the logical clock
+ * @param <T> the result type
+ * @return the update result or {@code null} if the rule was not updated
+ */
+ public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function, long term, LogicalClock clock) {
+ Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
+ if (flowEntries == null) {
+ flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
+ }
+
+ AtomicReference<T> resultRef = new AtomicReference<>();
+ flowEntries.computeIfPresent(new DefaultFlowEntry(rule), (k, stored) -> {
+ if (stored != null) {
+ T result = function.apply(stored);
+ if (result != null) {
+ recordUpdate(term, clock.getTimestamp());
+ resultRef.set(result);
+ }
+ }
+ return stored;
+ });
+ return resultRef.get();
+ }
+
+ /**
+ * Removes the given flow rule from the bucket.
+ *
+ * @param rule the rule to remove
+ * @param term the term in which the change occurred
+ * @param clock the logical clock
+ * @return the removed flow entry
+ */
+ public FlowEntry remove(FlowEntry rule, long term, LogicalClock clock) {
+ final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
+ flowBucket.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
+ flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
+ if (rule instanceof DefaultFlowEntry) {
+ DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
+ if (stored instanceof DefaultFlowEntry) {
+ DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
+ if (toRemove.created() < storedEntry.created()) {
+ LOGGER.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
+ // the key is not updated, removedRule remains null
+ return stored;
+ }
+ }
+ }
+ removedRule.set(stored);
+ return null;
+ });
+ return flowEntries.isEmpty() ? null : flowEntries;
+ });
+
+ if (removedRule.get() != null) {
+ recordUpdate(term, clock.getTimestamp());
+ return removedRule.get();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Clears the bucket.
+ */
+ public void clear() {
+ term = 0;
+ timestamp = new LogicalTimestamp(0);
+ flowBucket.clear();
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java
new file mode 100644
index 0000000..b5453e5
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucketDigest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.Objects;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Flow bucket digest.
+ */
+public class FlowBucketDigest {
+ private final int bucket;
+ private final long term;
+ private final LogicalTimestamp timestamp;
+
+ FlowBucketDigest(int bucket, long term, LogicalTimestamp timestamp) {
+ this.bucket = bucket;
+ this.term = term;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Returns the bucket identifier.
+ *
+ * @return the bucket identifier
+ */
+ public int bucket() {
+ return bucket;
+ }
+
+ /**
+ * Returns the bucket term.
+ *
+ * @return the bucket term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
+ * Returns the bucket timestamp.
+ *
+ * @return the bucket timestamp
+ */
+ public LogicalTimestamp timestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Returns a boolean indicating whether this digest is newer than the given digest.
+ *
+ * @param digest the digest to check
+ * @return indicates whether this digest is newer than the given digest
+ */
+ public boolean isNewerThan(FlowBucketDigest digest) {
+ return digest == null || term() > digest.term() || timestamp().isNewerThan(digest.timestamp());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bucket);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ return object instanceof FlowBucketDigest
+ && ((FlowBucketDigest) object).bucket == bucket;
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
new file mode 100644
index 0000000..29672d0
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.AbstractEvent;
+
+/**
+ * Flow table lifecycle event.
+ */
+public class LifecycleEvent extends AbstractEvent<LifecycleEvent.Type, DeviceReplicaInfo> {
+
+ /**
+ * Lifecycle event type.
+ */
+ public enum Type {
+ TERM_START,
+ TERM_ACTIVE,
+ TERM_END,
+ }
+
+ public LifecycleEvent(Type type, DeviceReplicaInfo subject) {
+ super(type, subject);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java
new file mode 100644
index 0000000..a2f3b3d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Flow table lifecycle event listener.
+ */
+public interface LifecycleEventListener extends EventListener<LifecycleEvent> {
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java
new file mode 100644
index 0000000..213454c
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LifecycleManager.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.event.ListenerService;
+
+/**
+ * Flow table lifecycle manager.
+ */
+public interface LifecycleManager extends ListenerService<LifecycleEvent, LifecycleEventListener> {
+
+ /**
+ * Returns the current term.
+ *
+ * @return the current term
+ */
+ DeviceReplicaInfo getReplicaInfo();
+
+ /**
+ * Activates the given term.
+ *
+ * @param term the term to activate
+ */
+ void activate(long term);
+
+ /**
+ * Closes the lifecycle manager.
+ */
+ void close();
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java
new file mode 100644
index 0000000..92d39cf
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/LogicalClock.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Logical clock.
+ */
+public final class LogicalClock {
+ private final AtomicLong timestamp = new AtomicLong();
+
+ /**
+ * Records an event.
+ *
+ * @param timestamp the event timestamp
+ */
+ public void tick(LogicalTimestamp timestamp) {
+ this.timestamp.accumulateAndGet(timestamp.value(), (x, y) -> Math.max(x, y) + 1);
+ }
+
+ /**
+ * Returns a timestamped value.
+ *
+ * @param value the value to timestamp
+ * @param <T> the value type
+ * @return the timestamped value
+ */
+ public <T> Timestamped<T> timestamp(T value) {
+ return new Timestamped<>(value, getTimestamp());
+ }
+
+ /**
+ * Increments and returns the current timestamp.
+ *
+ * @return the current timestamp
+ */
+ public LogicalTimestamp getTimestamp() {
+ return new LogicalTimestamp(timestamp.incrementAndGet());
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java
new file mode 100644
index 0000000..e7f566b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/Timestamped.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.flow.impl;
+
+import org.onosproject.store.LogicalTimestamp;
+
+/**
+ * Timestamped value.
+ */
+public class Timestamped<T> {
+ private final T value;
+ private final LogicalTimestamp timestamp;
+
+ public Timestamped(T value, LogicalTimestamp timestamp) {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Returns the value.
+ *
+ * @return the value
+ */
+ public T value() {
+ return value;
+ }
+
+ /**
+ * Returns the timestamp.
+ *
+ * @return the timestamp
+ */
+ public LogicalTimestamp timestamp() {
+ return timestamp;
+ }
+}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
index 7fd0412..72562f9 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
@@ -16,6 +16,7 @@
package org.onosproject.store.flow.impl;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import org.junit.After;
import org.junit.Before;
@@ -26,6 +27,7 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.device.DeviceServiceAdapter;
import org.onosproject.net.DeviceId;
@@ -40,10 +42,16 @@
import org.onosproject.net.intent.IntentTestsMocks;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.persistence.PersistenceServiceAdapter;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncConsistentMapAdapter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.TestStorageService;
import org.onlab.packet.Ip4Address;
import java.util.Iterator;
+import java.util.Optional;
+
import org.osgi.service.component.ComponentContext;
import static org.easymock.EasyMock.createMock;
@@ -103,6 +111,16 @@
public NodeId getMasterFor(DeviceId deviceId) {
return new NodeId("1");
}
+
+ @Override
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ return new MastershipInfo(
+ 1,
+ Optional.of(NodeId.nodeId("1")),
+ ImmutableMap.<NodeId, MastershipRole>builder()
+ .put(NodeId.nodeId("1"), MastershipRole.MASTER)
+ .build());
+ }
}
@@ -132,8 +150,27 @@
@Before
public void setUp() throws Exception {
flowStoreImpl = new ECFlowRuleStore();
- flowStoreImpl.storageService = new TestStorageService();
- flowStoreImpl.replicaInfoManager = new ReplicaInfoManager();
+ flowStoreImpl.storageService = new TestStorageService() {
+ @Override
+ public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+ return new ConsistentMapBuilder<K, V>() {
+ @Override
+ public AsyncConsistentMap<K, V> buildAsyncMap() {
+ return new AsyncConsistentMapAdapter<K, V>();
+ }
+
+ @Override
+ public ConsistentMap<K, V> build() {
+ return null;
+ }
+ };
+ }
+ };
+
+ ReplicaInfoManager replicaInfoManager = new ReplicaInfoManager();
+ replicaInfoManager.mastershipService = new MasterOfAll();
+
+ flowStoreImpl.replicaInfoManager = replicaInfoManager;
mockClusterService = createMock(ClusterService.class);
flowStoreImpl.clusterService = mockClusterService;
nodeId = new NodeId("1");