blob: a257cbd60a3d6eb8454187388044227225243092 [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Jordan Halterman43300782019-05-21 11:27:50 -070018import java.time.Duration;
Jordan Halterman281dbf32018-06-15 17:46:28 -070019import java.util.Collection;
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070020import java.util.Collections;
Jordan Halterman281dbf32018-06-15 17:46:28 -070021import java.util.LinkedList;
22import java.util.List;
23import java.util.Map;
24import java.util.Queue;
25import java.util.Set;
26import java.util.concurrent.CompletableFuture;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080027import java.util.concurrent.Executor;
Jordan Halterman281dbf32018-06-15 17:46:28 -070028import java.util.concurrent.ScheduledExecutorService;
29import java.util.concurrent.ScheduledFuture;
30import java.util.concurrent.TimeUnit;
31import java.util.function.BiFunction;
32import java.util.function.Function;
33import java.util.stream.Collectors;
34
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070035import com.google.common.collect.Iterables;
Daniele Moro43ac2892021-07-15 17:02:59 +020036import com.google.common.collect.Lists;
Jordan Halterman281dbf32018-06-15 17:46:28 -070037import com.google.common.collect.Maps;
38import com.google.common.collect.Sets;
39import org.onlab.util.KryoNamespace;
40import org.onlab.util.Tools;
41import org.onosproject.cluster.ClusterService;
42import org.onosproject.cluster.NodeId;
Daniele Moro43ac2892021-07-15 17:02:59 +020043import org.onosproject.core.ApplicationId;
Jordan Halterman281dbf32018-06-15 17:46:28 -070044import org.onosproject.net.DeviceId;
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +000045import org.onosproject.net.device.DeviceService;
Jordan Halterman281dbf32018-06-15 17:46:28 -070046import org.onosproject.net.flow.FlowEntry;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +000049import org.onosproject.net.flow.FlowRuleStoreException;
Daniele Moro43ac2892021-07-15 17:02:59 +020050import org.onosproject.net.flow.StoredFlowEntry;
Jordan Halterman281dbf32018-06-15 17:46:28 -070051import org.onosproject.store.LogicalTimestamp;
52import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
53import org.onosproject.store.cluster.messaging.MessageSubject;
54import org.onosproject.store.serializers.KryoNamespaces;
55import org.onosproject.store.service.Serializer;
56import org.slf4j.Logger;
57import org.slf4j.LoggerFactory;
58
59/**
60 * Flow table for all flows associated with a specific device.
61 * <p>
62 * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
63 * table performs communication independent of other device flow tables for more parallelism.
64 * <p>
65 * This implementation uses several different replication protocols. Changes that occur on the device master are
66 * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
67 * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
68 * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
69 * device, allowing mastership to be reassigned to non-backup nodes.
70 */
71public class DeviceFlowTable {
Jordan Haltermanaeda2752019-02-22 12:31:25 -080072 private static final int NUM_BUCKETS = 128;
Jordan Halterman281dbf32018-06-15 17:46:28 -070073 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
74 .register(KryoNamespaces.API)
75 .register(BucketId.class)
76 .register(FlowBucket.class)
77 .register(FlowBucketDigest.class)
78 .register(LogicalTimestamp.class)
79 .register(Timestamped.class)
80 .build());
Jordan Halterman43300782019-05-21 11:27:50 -070081 private static final int GET_FLOW_ENTRIES_TIMEOUT = 15; // seconds
Jordan Halterman281dbf32018-06-15 17:46:28 -070082
83 private final Logger log = LoggerFactory.getLogger(getClass());
84
85 private final MessageSubject getDigestsSubject;
86 private final MessageSubject getBucketSubject;
87 private final MessageSubject backupSubject;
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070088 private final MessageSubject getFlowsSubject;
Jordan Halterman281dbf32018-06-15 17:46:28 -070089
90 private final DeviceId deviceId;
91 private final ClusterCommunicationService clusterCommunicator;
Andrea Campanella5daa7c462020-03-13 12:04:23 +010092 private final ClusterService clusterService;
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +000093 private final DeviceService deviceService;
Jordan Halterman281dbf32018-06-15 17:46:28 -070094 private final LifecycleManager lifecycleManager;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080095 private final ScheduledExecutorService scheduler;
96 private final Executor executor;
Jordan Halterman281dbf32018-06-15 17:46:28 -070097 private final NodeId localNodeId;
98
99 private final LogicalClock clock = new LogicalClock();
100
101 private volatile DeviceReplicaInfo replicaInfo;
102 private volatile long activeTerm;
103
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800104 private long backupPeriod;
105
Jordan Halterman281dbf32018-06-15 17:46:28 -0700106 private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
107 @Override
108 public void event(LifecycleEvent event) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800109 executor.execute(() -> onLifecycleEvent(event));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700110 }
111 };
112
Jordan Halterman281dbf32018-06-15 17:46:28 -0700113 private ScheduledFuture<?> antiEntropyFuture;
114
115 private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
116 private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
117
118 private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
119 private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
120
121 DeviceFlowTable(
122 DeviceId deviceId,
123 ClusterService clusterService,
124 ClusterCommunicationService clusterCommunicator,
125 LifecycleManager lifecycleManager,
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000126 DeviceService deviceService,
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800127 ScheduledExecutorService scheduler,
128 Executor executor,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700129 long backupPeriod,
130 long antiEntropyPeriod) {
131 this.deviceId = deviceId;
132 this.clusterCommunicator = clusterCommunicator;
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100133 this.clusterService = clusterService;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700134 this.lifecycleManager = lifecycleManager;
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000135 this.deviceService = deviceService;
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800136 this.scheduler = scheduler;
137 this.executor = executor;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700138 this.localNodeId = clusterService.getLocalNode().id();
Jordan Haltermane3de3212019-03-08 10:48:22 -0800139 this.replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700140
141 for (int i = 0; i < NUM_BUCKETS; i++) {
142 flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
143 }
144
145 getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
146 getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
147 backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700148 getFlowsSubject = new MessageSubject(String.format("flow-store-%s-flows", deviceId));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700149
Jordan Haltermane3de3212019-03-08 10:48:22 -0800150 addListeners();
151
Jordan Halterman281dbf32018-06-15 17:46:28 -0700152 setBackupPeriod(backupPeriod);
153 setAntiEntropyPeriod(antiEntropyPeriod);
154 registerSubscribers();
155
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800156 scheduleBackups();
Jordan Haltermane3de3212019-03-08 10:48:22 -0800157
158 activateMaster(replicaInfo);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700159 }
160
161 /**
162 * Sets the flow table backup period.
163 *
164 * @param backupPeriod the flow table backup period in milliseconds
165 */
166 synchronized void setBackupPeriod(long backupPeriod) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800167 this.backupPeriod = backupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700168 }
169
170 /**
171 * Sets the flow table anti-entropy period.
172 *
173 * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
174 */
175 synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
176 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
177 if (antiEntropyFuture != null) {
178 antiEntropyFuture.cancel(false);
179 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800180 this.antiEntropyFuture = scheduler.scheduleAtFixedRate(
181 () -> executor.execute(this::runAntiEntropy),
182 antiEntropyPeriod,
183 antiEntropyPeriod,
184 TimeUnit.MILLISECONDS);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700185 }
186
187 /**
188 * Counts the flows in the table.
189 *
190 * @return the total number of flows in the table
191 */
192 public int count() {
193 return flowBuckets.values().stream()
194 .mapToInt(FlowBucket::count)
195 .sum();
196 }
197
198 /**
199 * Returns the flow entry for the given rule.
200 *
201 * @param rule the rule for which to lookup the flow entry
202 * @return the flow entry for the given rule
203 */
204 public StoredFlowEntry getFlowEntry(FlowRule rule) {
205 return getBucket(rule.id())
206 .getFlowEntries(rule.id())
207 .get(rule);
208 }
209
210 /**
211 * Returns the set of flow entries in the table.
212 *
213 * @return the set of flow entries in the table
214 */
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700215 public CompletableFuture<Iterable<FlowEntry>> getFlowEntries() {
216 // Fetch the entries for each bucket in parallel and then concatenate the sets
217 // to create a single iterable.
218 return Tools.allOf(flowBuckets.values()
219 .stream()
220 .map(this::getFlowEntries)
221 .collect(Collectors.toList()))
222 .thenApply(Iterables::concat);
223 }
224
225 /**
226 * Fetches the set of flow entries in the given bucket.
227 *
228 * @param bucketId the bucket for which to fetch flow entries
229 * @return a future to be completed once the flow entries have been retrieved
230 */
231 private CompletableFuture<Set<FlowEntry>> getFlowEntries(BucketId bucketId) {
232 return getFlowEntries(getBucket(bucketId.bucket()));
233 }
234
235 /**
236 * Fetches the set of flow entries in the given bucket.
237 *
238 * @param bucket the bucket for which to fetch flow entries
239 * @return a future to be completed once the flow entries have been retrieved
240 */
241 private CompletableFuture<Set<FlowEntry>> getFlowEntries(FlowBucket bucket) {
242 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700243 // If the local node is the master, fetch the entries locally. Otherwise, request the entries
244 // from the current master. Note that there's a change of a brief cycle during a mastership change.
245 if (replicaInfo.isMaster(localNodeId)) {
246 return CompletableFuture.completedFuture(
247 bucket.getFlowBucket().values().stream()
248 .flatMap(entries -> entries.values().stream())
249 .collect(Collectors.toSet()));
250 } else if (replicaInfo.master() != null) {
251 return clusterCommunicator.sendAndReceive(
252 bucket.bucketId(),
253 getFlowsSubject,
254 SERIALIZER::encode,
255 SERIALIZER::decode,
Jordan Halterman43300782019-05-21 11:27:50 -0700256 replicaInfo.master(),
257 Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000258 } else if (deviceService.isAvailable(deviceId)) {
259 throw new FlowRuleStoreException("There is no master for available device " + deviceId);
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100260 } else if (clusterService.getNodes().size() <= 1 + ECFlowRuleStore.backupCount) {
261 //TODO remove this check when [ONOS-8080] is fixed
262 //When device is not available and has no master and
263 // the number of nodes surpasses the guaranteed backup count,
264 // we are certain that this node has a replica.
265 // -- DISCLAIMER --
266 // You manually need to set the backup count for clusters > 3 nodes,
267 // the default is 2, which handles the single instance and 3 node scenarios
268 return CompletableFuture.completedFuture(
269 bucket.getFlowBucket().values().stream()
270 .flatMap(entries -> entries.values().stream())
271 .collect(Collectors.toSet()));
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700272 } else {
273 return CompletableFuture.completedFuture(Collections.emptySet());
274 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700275 }
276
277 /**
278 * Returns the bucket for the given flow identifier.
279 *
280 * @param flowId the flow identifier
281 * @return the bucket for the given flow identifier
282 */
283 private FlowBucket getBucket(FlowId flowId) {
284 return getBucket(bucket(flowId));
285 }
286
287 /**
288 * Returns the bucket with the given identifier.
289 *
290 * @param bucketId the bucket identifier
291 * @return the bucket with the given identifier
292 */
293 private FlowBucket getBucket(int bucketId) {
294 return flowBuckets.get(bucketId);
295 }
296
297 /**
298 * Returns the bucket number for the given flow identifier.
299 *
300 * @param flowId the flow identifier
301 * @return the bucket number for the given flow identifier
302 */
303 private int bucket(FlowId flowId) {
304 return Math.abs((int) (flowId.id() % NUM_BUCKETS));
305 }
306
307 /**
308 * Returns the digests for all buckets in the flow table for the device.
309 *
310 * @return the set of digests for all buckets for the device
311 */
312 private Set<FlowBucketDigest> getDigests() {
313 return flowBuckets.values()
314 .stream()
315 .map(bucket -> bucket.getDigest())
316 .collect(Collectors.toSet());
317 }
318
319 /**
320 * Returns the digest for the given bucket.
321 *
322 * @param bucket the bucket for which to return the digest
323 * @return the digest for the given bucket
324 */
325 private FlowBucketDigest getDigest(int bucket) {
326 return flowBuckets.get(bucket).getDigest();
327 }
328
329 /**
330 * Adds an entry to the table.
331 *
332 * @param rule the rule to add
333 * @return a future to be completed once the rule has been added
334 */
335 public CompletableFuture<Void> add(FlowEntry rule) {
336 return runInTerm(rule.id(), (bucket, term) -> {
337 bucket.add(rule, term, clock);
338 return null;
339 });
340 }
341
342 /**
343 * Updates an entry in the table.
344 *
345 * @param rule the rule to update
346 * @return a future to be completed once the rule has been updated
347 */
348 public CompletableFuture<Void> update(FlowEntry rule) {
349 return runInTerm(rule.id(), (bucket, term) -> {
350 bucket.update(rule, term, clock);
351 return null;
352 });
353 }
354
355 /**
356 * Applies the given update function to the rule.
357 *
358 * @param rule the rule to update
359 * @param function the update function to apply
360 * @param <T> the result type
361 * @return a future to be completed with the update result or {@code null} if the rule was not updated
362 */
363 public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
364 return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
365 }
366
367 /**
368 * Removes an entry from the table.
369 *
370 * @param rule the rule to remove
371 * @return a future to be completed once the rule has been removed
372 */
373 public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
374 return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
375 }
376
377 /**
378 * Runs the given function in the current term.
379 *
380 * @param flowId the flow identifier indicating the bucket in which to run the function
381 * @param function the function to execute in the current term
382 * @param <T> the future result type
383 * @return a future to be completed with the function result once it has been run
384 */
385 private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
Jordan Halterman07093b02019-03-11 13:30:12 -0700386 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
387 if (!replicaInfo.isMaster(localNodeId)) {
388 return Tools.exceptionalFuture(new IllegalStateException());
389 }
390
391 FlowBucket bucket = getBucket(flowId);
392
393 // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
394 // the change to be executed once the master has been synchronized.
395 final long term = replicaInfo.term();
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800396 CompletableFuture<T> future = new CompletableFuture<>();
Jordan Halterman07093b02019-03-11 13:30:12 -0700397 if (activeTerm < term) {
398 log.debug("Enqueueing operation for device {}", deviceId);
399 flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
400 .add(() -> future.complete(apply(function, bucket, term)));
401 } else {
402 future.complete(apply(function, bucket, term));
403 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800404 return future;
405 }
406
407 /**
Jordan Halterman07093b02019-03-11 13:30:12 -0700408 * Applies the given function to the given bucket.
409 *
410 * @param function the function to apply
411 * @param bucket the bucket to which to apply the function
412 * @param term the term in which to apply the function
413 * @param <T> the expected result type
414 * @return the function result
415 */
416 private <T> T apply(BiFunction<FlowBucket, Long, T> function, FlowBucket bucket, long term) {
417 synchronized (bucket) {
418 return function.apply(bucket, term);
419 }
420 }
421
422 /**
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800423 * Schedules bucket backups.
424 */
425 private void scheduleBackups() {
426 flowBuckets.values().forEach(bucket -> backupBucket(bucket).whenComplete((result, error) -> {
427 scheduleBackup(bucket);
428 }));
429 }
430
431 /**
432 * Schedules a backup for the given bucket.
433 *
434 * @param bucket the bucket for which to schedule the backup
435 */
436 private void scheduleBackup(FlowBucket bucket) {
437 scheduler.schedule(
438 () -> executor.execute(() -> backupBucket(bucket)),
439 backupPeriod,
440 TimeUnit.MILLISECONDS);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700441 }
442
443 /**
444 * Backs up all buckets in the given device to the given node.
445 */
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800446 private CompletableFuture<Void> backupAll() {
447 CompletableFuture<?>[] futures = flowBuckets.values()
448 .stream()
449 .map(bucket -> backupBucket(bucket))
450 .toArray(CompletableFuture[]::new);
451 return CompletableFuture.allOf(futures);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700452 }
453
454 /**
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800455 * Backs up the given flow bucket.
Jordan Halterman281dbf32018-06-15 17:46:28 -0700456 *
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800457 * @param bucket the flow bucket to backup
Jordan Halterman281dbf32018-06-15 17:46:28 -0700458 */
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800459 private CompletableFuture<Void> backupBucket(FlowBucket bucket) {
460 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700461
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800462 // Only replicate if the bucket's term matches the replica term and the local node is the current master.
463 // This ensures that the bucket has been synchronized prior to a new master replicating changes to backups.
464 // Only replicate if the local node is the current master.
465 if (bucket.term() == replicaInfo.term() && replicaInfo.isMaster(localNodeId)) {
466 // Replicate the bucket to each of the backup nodes.
467 CompletableFuture<?>[] futures = replicaInfo.backups()
468 .stream()
469 .map(nodeId -> backupBucketToNode(bucket, nodeId))
470 .toArray(CompletableFuture[]::new);
471 return CompletableFuture.allOf(futures);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700472 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800473 return CompletableFuture.completedFuture(null);
474 }
475
476 /**
477 * Backs up the given flow bucket to the given node.
478 *
479 * @param bucket the bucket to backup
480 * @param nodeId the node to which to back up the bucket
481 * @return a future to be completed once the bucket has been backed up
482 */
483 private CompletableFuture<Void> backupBucketToNode(FlowBucket bucket, NodeId nodeId) {
484 // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
485 LogicalTimestamp timestamp = bucket.timestamp();
486
487 // If the backup can be run (no concurrent backup to the node in progress) then run it.
488 BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
489 if (startBackup(operation, timestamp)) {
490 CompletableFuture<Void> future = new CompletableFuture<>();
491 backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
492 if (error != null) {
493 log.debug("Backup operation {} failed", operation, error);
494 failBackup(operation);
495 } else if (succeeded) {
496 succeedBackup(operation, timestamp);
497 } else {
498 log.debug("Backup operation {} failed: term mismatch", operation);
499 failBackup(operation);
500 }
501 future.complete(null);
502 }, executor);
503 return future;
504 }
505 return CompletableFuture.completedFuture(null);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700506 }
507
508 /**
509 * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
510 * <p>
511 * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
512 * are pending replication for the backup operation.
513 *
514 * @param operation the operation to start
515 * @param timestamp the timestamp for which to start the backup operation
516 * @return indicates whether the given backup operation should be started
517 */
518 private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
519 LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
520 return timestamp != null
521 && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
522 && inFlightUpdates.add(operation);
523 }
524
525 /**
526 * Fails the given backup operation.
527 *
528 * @param operation the backup operation to fail
529 */
530 private void failBackup(BackupOperation operation) {
531 inFlightUpdates.remove(operation);
532 }
533
534 /**
535 * Succeeds the given backup operation.
536 * <p>
537 * The last backup time for the operation will be updated and the operation will be removed from
538 * in-flight updates.
539 *
540 * @param operation the operation to succeed
541 * @param timestamp the timestamp at which the operation was <em>started</em>
542 */
543 private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
544 lastBackupTimes.put(operation, timestamp);
545 inFlightUpdates.remove(operation);
546 }
547
548 /**
549 * Resets the last completion time for the given backup operation to ensure it's replicated again.
550 *
551 * @param operation the backup operation to reset
552 */
553 private void resetBackup(BackupOperation operation) {
554 lastBackupTimes.remove(operation);
555 }
556
557 /**
558 * Performs the given backup operation.
559 *
560 * @param bucket the bucket to backup
561 * @param nodeId the node to which to backup the bucket
562 * @return a future to be completed with a boolean indicating whether the backup operation was successful
563 */
564 private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
565 if (log.isDebugEnabled()) {
566 log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
567 }
Jordan Halterman07093b02019-03-11 13:30:12 -0700568 synchronized (bucket) {
569 return sendWithTimestamp(bucket, backupSubject, nodeId);
570 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700571 }
572
573 /**
574 * Handles a flow bucket backup from a remote peer.
575 *
576 * @param flowBucket the flow bucket to back up
577 * @return the set of flows that could not be backed up
578 */
579 private boolean onBackup(FlowBucket flowBucket) {
580 if (log.isDebugEnabled()) {
581 log.debug("{} - Received {} flow entries in bucket {} to backup",
582 deviceId, flowBucket.count(), flowBucket.bucketId());
583 }
584
585 try {
586 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
587
588 // If the backup is for a different term, reject the request until we learn about the new term.
589 if (flowBucket.term() != replicaInfo.term()) {
590 log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
591 return false;
592 }
593
594 flowBuckets.compute(flowBucket.bucketId().bucket(),
595 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
596 return true;
597 } catch (Exception e) {
598 log.warn("Failure processing backup request", e);
599 return false;
600 }
601 }
602
603 /**
604 * Runs the anti-entropy protocol.
605 */
606 private void runAntiEntropy() {
607 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
608 if (!replicaInfo.isMaster(localNodeId)) {
609 return;
610 }
611
612 for (NodeId nodeId : replicaInfo.backups()) {
613 runAntiEntropy(nodeId);
614 }
615 }
616
617 /**
618 * Runs the anti-entropy protocol against the given peer.
619 *
620 * @param nodeId the node with which to execute the anti-entropy protocol
621 */
622 private void runAntiEntropy(NodeId nodeId) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800623 backupAll().whenCompleteAsync((result, error) -> {
624 requestDigests(nodeId).thenAcceptAsync((digests) -> {
625 // Compute a set of missing BucketIds based on digest times and send them back to the master.
626 for (FlowBucketDigest remoteDigest : digests) {
627 FlowBucket localBucket = getBucket(remoteDigest.bucket());
628 if (localBucket.getDigest().isNewerThan(remoteDigest)) {
629 log.debug("Detected missing flow entries on node {} in bucket {}/{}",
630 nodeId, deviceId, remoteDigest.bucket());
631 resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
632 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700633 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800634 }, executor);
635 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700636 }
637
638 /**
639 * Sends a digest request to the given node.
640 *
641 * @param nodeId the node to which to send the request
642 * @return future to be completed with the set of digests for the given device on the given node
643 */
644 private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
645 return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
646 }
647
648 /**
649 * Synchronizes flows from the previous master or backups.
650 *
651 * @param prevReplicaInfo the previous replica info
652 * @param newReplicaInfo the new replica info
653 */
654 private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
655 if (prevReplicaInfo == null) {
656 activateMaster(newReplicaInfo);
657 } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
658 syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
659 } else {
660 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
661 }
662 }
663
664 /**
665 * Synchronizes flows from the previous master, falling back to backups if the master fails.
666 *
667 * @param prevReplicaInfo the previous replica info
668 * @param newReplicaInfo the new replica info
669 */
670 private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
671 syncFlowsOn(prevReplicaInfo.master())
672 .whenCompleteAsync((result, error) -> {
673 if (error != null) {
674 log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
675 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
676 } else {
677 activateMaster(newReplicaInfo);
678 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800679 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700680 }
681
682 /**
683 * Synchronizes flows from the previous backups.
684 *
685 * @param prevReplicaInfo the previous replica info
686 * @param newReplicaInfo the new replica info
687 */
688 private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
689 List<NodeId> backups = prevReplicaInfo.backups()
690 .stream()
691 .filter(nodeId -> !nodeId.equals(localNodeId))
692 .collect(Collectors.toList());
693 syncFlowsOn(backups)
694 .whenCompleteAsync((result, error) -> {
695 if (error != null) {
696 log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
697 }
698 activateMaster(newReplicaInfo);
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800699 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700700 }
701
702 /**
703 * Synchronizes flows for the device on the given nodes.
704 *
705 * @param nodes the nodes via which to synchronize the flows
706 * @return a future to be completed once flows have been synchronizes
707 */
708 private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
709 return nodes.isEmpty()
710 ? CompletableFuture.completedFuture(null)
711 : Tools.firstOf(nodes.stream()
712 .map(node -> syncFlowsOn(node))
713 .collect(Collectors.toList()))
714 .thenApply(v -> null);
715 }
716
717 /**
718 * Synchronizes flows for the device from the given node.
719 *
720 * @param nodeId the node from which to synchronize flows
721 * @return a future to be completed once the flows have been synchronizes
722 */
723 private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
724 return requestDigests(nodeId)
725 .thenCompose(digests -> Tools.allOf(digests.stream()
726 .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
727 .map(digest -> syncBucketOn(nodeId, digest.bucket()))
728 .collect(Collectors.toList())))
729 .thenApply(v -> null);
730 }
731
732 /**
733 * Synchronizes the given bucket on the given node.
734 *
735 * @param nodeId the node on which to synchronize the bucket
736 * @param bucketNumber the bucket to synchronize
737 * @return a future to be completed once the bucket has been synchronizes
738 */
739 private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
740 return requestBucket(nodeId, bucketNumber)
741 .thenAcceptAsync(flowBucket -> {
742 flowBuckets.compute(flowBucket.bucketId().bucket(),
743 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800744 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700745 }
746
747 /**
748 * Requests the given bucket from the given node.
749 *
750 * @param nodeId the node from which to request the bucket
751 * @param bucket the bucket to request
752 * @return a future to be completed with the bucket
753 */
754 private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
755 log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
756 return sendWithTimestamp(bucket, getBucketSubject, nodeId);
757 }
758
759 /**
760 * Handles a flow bucket request.
761 *
Jordan Halterman158feb92018-06-30 23:44:30 -0700762 * @param bucketId the bucket number
Jordan Halterman281dbf32018-06-15 17:46:28 -0700763 * @return the flow bucket
764 */
Jordan Halterman158feb92018-06-30 23:44:30 -0700765 private FlowBucket onGetBucket(int bucketId) {
766 return flowBuckets.get(bucketId).copy();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700767 }
768
769 /**
770 * Activates the new master term.
771 *
772 * @param replicaInfo the new replica info
773 */
774 private void activateMaster(DeviceReplicaInfo replicaInfo) {
Jordan Haltermane3de3212019-03-08 10:48:22 -0800775 if (replicaInfo.isMaster(localNodeId)) {
776 log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
777 for (int i = 0; i < NUM_BUCKETS; i++) {
778 activateBucket(i);
779 }
780 lifecycleManager.activate(replicaInfo.term());
781 activeTerm = replicaInfo.term();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700782 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700783 }
784
785 /**
786 * Activates the given bucket number.
787 *
788 * @param bucket the bucket number to activate
789 */
790 private void activateBucket(int bucket) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800791 Queue<Runnable> tasks = flowTasks.remove(bucket);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700792 if (tasks != null) {
793 log.debug("Completing enqueued operations for device {}", deviceId);
794 tasks.forEach(task -> task.run());
795 }
796 }
797
798 /**
799 * Handles a lifecycle event.
800 */
801 private void onLifecycleEvent(LifecycleEvent event) {
802 log.debug("Received lifecycle event for device {}: {}", deviceId, event);
803 switch (event.type()) {
804 case TERM_START:
805 startTerm(event.subject());
806 break;
807 case TERM_ACTIVE:
808 activateTerm(event.subject());
809 break;
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700810 case TERM_UPDATE:
811 updateTerm(event.subject());
812 break;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700813 default:
814 break;
815 }
816 }
817
818 /**
819 * Handles a replica change at the start of a new term.
820 */
821 private void startTerm(DeviceReplicaInfo replicaInfo) {
822 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
823 this.replicaInfo = replicaInfo;
824 if (replicaInfo.isMaster(localNodeId)) {
825 log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
826 syncFlows(oldReplicaInfo, replicaInfo);
827 }
828 }
829
830 /**
831 * Handles the activation of a term.
832 */
833 private void activateTerm(DeviceReplicaInfo replicaInfo) {
834 if (replicaInfo.term() < this.replicaInfo.term()) {
835 return;
836 }
837 if (replicaInfo.term() > this.replicaInfo.term()) {
838 this.replicaInfo = replicaInfo;
839 }
840
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100841 // If the local node is neither the master or a backup for the device,
842 // and the number of nodes surpasses the guaranteed backup count, clear the flow table.
843 if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId) &&
844 (clusterService.getNodes().size() > 1 + ECFlowRuleStore.backupCount)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700845 flowBuckets.values().forEach(bucket -> bucket.clear());
846 }
847 activeTerm = replicaInfo.term();
848 }
849
850 /**
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700851 * Handles an update to a term.
852 */
853 private void updateTerm(DeviceReplicaInfo replicaInfo) {
Jordan Halterman0677d882019-03-07 15:52:34 -0800854 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
855 if (oldReplicaInfo != null && replicaInfo.term() == oldReplicaInfo.term()) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700856 this.replicaInfo = replicaInfo;
857
858 // If the local node is neither the master or a backup for the device *and the term is active*,
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100859 // and the number of nodes surpasses the guaranteed backup count, clear the flow table.
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700860 if (activeTerm == replicaInfo.term()
861 && !replicaInfo.isMaster(localNodeId)
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100862 && !replicaInfo.isBackup(localNodeId)
863 && (clusterService.getNodes().size() > 1 + ECFlowRuleStore.backupCount)) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700864 flowBuckets.values().forEach(bucket -> bucket.clear());
865 }
866 }
867 }
868
869 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700870 * Sends a message to the given node wrapped in a Lamport timestamp.
871 * <p>
872 * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
873 * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
874 *
875 * @param message the message to send
876 * @param subject the message subject
877 * @param toNodeId the node to which to send the message
878 * @param <M> the message type
879 * @param <R> the response type
880 * @return a future to be completed with the response
881 */
882 private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
883 return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
884 clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
885 .thenApply(response -> {
886 clock.tick(response.timestamp());
887 return response.value();
888 });
889 }
890
891 /**
892 * Receives messages to the given subject wrapped in Lamport timestamps.
893 * <p>
894 * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
895 * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
896 *
897 * @param subject the subject for which to register the subscriber
898 * @param function the raw message handler
899 * @param <M> the raw message type
900 * @param <R> the raw response type
901 */
902 private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
903 clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
904 clock.tick(request.timestamp());
905 return clock.timestamp(function.apply(request.value()));
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800906 }, SERIALIZER::encode, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700907 }
908
909 /**
910 * Registers internal message subscribers.
911 */
912 private void registerSubscribers() {
913 receiveWithTimestamp(getDigestsSubject, v -> getDigests());
914 receiveWithTimestamp(getBucketSubject, this::onGetBucket);
915 receiveWithTimestamp(backupSubject, this::onBackup);
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700916 clusterCommunicator.<BucketId, Set<FlowEntry>>addSubscriber(
917 getFlowsSubject, SERIALIZER::decode, this::getFlowEntries, SERIALIZER::encode);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700918 }
919
920 /**
921 * Unregisters internal message subscribers.
922 */
923 private void unregisterSubscribers() {
924 clusterCommunicator.removeSubscriber(getDigestsSubject);
925 clusterCommunicator.removeSubscriber(getBucketSubject);
926 clusterCommunicator.removeSubscriber(backupSubject);
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700927 clusterCommunicator.removeSubscriber(getFlowsSubject);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700928 }
929
930 /**
931 * Adds internal event listeners.
932 */
933 private void addListeners() {
934 lifecycleManager.addListener(lifecycleEventListener);
935 }
936
937 /**
938 * Removes internal event listeners.
939 */
940 private void removeListeners() {
941 lifecycleManager.removeListener(lifecycleEventListener);
942 }
943
944 /**
945 * Cancels recurrent scheduled futures.
946 */
947 private synchronized void cancelFutures() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700948 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
949 if (antiEntropyFuture != null) {
950 antiEntropyFuture.cancel(false);
951 }
952 }
953
954 /**
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700955 * Purges the flow table.
956 */
957 public void purge() {
958 flowTasks.clear();
959 flowBuckets.values().forEach(bucket -> bucket.purge());
960 lastBackupTimes.clear();
961 inFlightUpdates.clear();
962 }
963
964 /**
Daniele Moro43ac2892021-07-15 17:02:59 +0200965 * Purges the flows with the given application id.
966 *
967 * @param appId the application id
968 * @return a future to be completed once flow rules with given application
969 * id have been purged on all buckets
970 */
971 public CompletableFuture<Void> purge(ApplicationId appId) {
972 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
973 if (!replicaInfo.isMaster(localNodeId)) {
974 return Tools.exceptionalFuture(new IllegalStateException());
975 }
976
977 // If the master's term is not currently active (has not been synchronized
978 // with prior replicas), enqueue the changes to be executed once the master
979 // has been synchronized.
980 final long term = replicaInfo.term();
981 List<CompletableFuture<Void>> completablePurges = Lists.newArrayList();
982 if (activeTerm < term) {
983 log.debug("Enqueueing operations for device {}", deviceId);
984 flowBuckets.values().forEach(
985 bucket -> {
986 CompletableFuture<Void> future = new CompletableFuture<>();
987 completablePurges.add(future);
988 flowTasks.computeIfAbsent(bucket.bucketId().bucket(),
989 b -> new LinkedList<>())
990 .add(() -> future.complete(apply((bkt, trm) -> {
991 bkt.purge(appId, trm, clock);
992 return null;
993 }, bucket, term)));
994 });
995
996 } else {
997 flowBuckets.values().forEach(bucket -> {
998 CompletableFuture<Void> future = new CompletableFuture<>();
999 completablePurges.add(future);
1000 future.complete(apply((bkt, trm) -> {
1001 bkt.purge(appId, trm, clock);
1002 return null;
1003 }, bucket, term));
1004 });
1005 }
1006 return CompletableFuture.allOf(completablePurges.toArray(new CompletableFuture[0]));
1007 }
1008
1009 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -07001010 * Closes the device flow table.
1011 */
1012 public void close() {
1013 removeListeners();
1014 unregisterSubscribers();
1015 cancelFutures();
1016 lifecycleManager.close();
1017 }
1018}