blob: f3206c3e55162be3c63401071ab248d5fc247143 [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Jordan Halterman43300782019-05-21 11:27:50 -070018import java.time.Duration;
Jordan Halterman281dbf32018-06-15 17:46:28 -070019import java.util.Collection;
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070020import java.util.Collections;
Jordan Halterman281dbf32018-06-15 17:46:28 -070021import java.util.LinkedList;
22import java.util.List;
23import java.util.Map;
24import java.util.Queue;
25import java.util.Set;
26import java.util.concurrent.CompletableFuture;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080027import java.util.concurrent.Executor;
Jordan Halterman281dbf32018-06-15 17:46:28 -070028import java.util.concurrent.ScheduledExecutorService;
29import java.util.concurrent.ScheduledFuture;
30import java.util.concurrent.TimeUnit;
31import java.util.function.BiFunction;
32import java.util.function.Function;
33import java.util.stream.Collectors;
34
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070035import com.google.common.collect.Iterables;
Jordan Halterman281dbf32018-06-15 17:46:28 -070036import com.google.common.collect.Maps;
37import com.google.common.collect.Sets;
38import org.onlab.util.KryoNamespace;
39import org.onlab.util.Tools;
40import org.onosproject.cluster.ClusterService;
41import org.onosproject.cluster.NodeId;
42import org.onosproject.net.DeviceId;
43import org.onosproject.net.flow.FlowEntry;
44import org.onosproject.net.flow.FlowId;
45import org.onosproject.net.flow.FlowRule;
46import org.onosproject.net.flow.StoredFlowEntry;
47import org.onosproject.store.LogicalTimestamp;
48import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
49import org.onosproject.store.cluster.messaging.MessageSubject;
50import org.onosproject.store.serializers.KryoNamespaces;
51import org.onosproject.store.service.Serializer;
52import org.slf4j.Logger;
53import org.slf4j.LoggerFactory;
54
55/**
56 * Flow table for all flows associated with a specific device.
57 * <p>
58 * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
59 * table performs communication independent of other device flow tables for more parallelism.
60 * <p>
61 * This implementation uses several different replication protocols. Changes that occur on the device master are
62 * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
63 * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
64 * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
65 * device, allowing mastership to be reassigned to non-backup nodes.
66 */
67public class DeviceFlowTable {
Jordan Haltermanaeda2752019-02-22 12:31:25 -080068 private static final int NUM_BUCKETS = 128;
Jordan Halterman281dbf32018-06-15 17:46:28 -070069 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
70 .register(KryoNamespaces.API)
71 .register(BucketId.class)
72 .register(FlowBucket.class)
73 .register(FlowBucketDigest.class)
74 .register(LogicalTimestamp.class)
75 .register(Timestamped.class)
76 .build());
Jordan Halterman43300782019-05-21 11:27:50 -070077 private static final int GET_FLOW_ENTRIES_TIMEOUT = 15; // seconds
Jordan Halterman281dbf32018-06-15 17:46:28 -070078
79 private final Logger log = LoggerFactory.getLogger(getClass());
80
81 private final MessageSubject getDigestsSubject;
82 private final MessageSubject getBucketSubject;
83 private final MessageSubject backupSubject;
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070084 private final MessageSubject getFlowsSubject;
Jordan Halterman281dbf32018-06-15 17:46:28 -070085
86 private final DeviceId deviceId;
87 private final ClusterCommunicationService clusterCommunicator;
88 private final LifecycleManager lifecycleManager;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080089 private final ScheduledExecutorService scheduler;
90 private final Executor executor;
Jordan Halterman281dbf32018-06-15 17:46:28 -070091 private final NodeId localNodeId;
92
93 private final LogicalClock clock = new LogicalClock();
94
95 private volatile DeviceReplicaInfo replicaInfo;
96 private volatile long activeTerm;
97
Jordan Haltermanaeda2752019-02-22 12:31:25 -080098 private long backupPeriod;
99
Jordan Halterman281dbf32018-06-15 17:46:28 -0700100 private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
101 @Override
102 public void event(LifecycleEvent event) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800103 executor.execute(() -> onLifecycleEvent(event));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700104 }
105 };
106
Jordan Halterman281dbf32018-06-15 17:46:28 -0700107 private ScheduledFuture<?> antiEntropyFuture;
108
109 private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
110 private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
111
112 private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
113 private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
114
115 DeviceFlowTable(
116 DeviceId deviceId,
117 ClusterService clusterService,
118 ClusterCommunicationService clusterCommunicator,
119 LifecycleManager lifecycleManager,
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800120 ScheduledExecutorService scheduler,
121 Executor executor,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700122 long backupPeriod,
123 long antiEntropyPeriod) {
124 this.deviceId = deviceId;
125 this.clusterCommunicator = clusterCommunicator;
126 this.lifecycleManager = lifecycleManager;
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800127 this.scheduler = scheduler;
128 this.executor = executor;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700129 this.localNodeId = clusterService.getLocalNode().id();
Jordan Haltermane3de3212019-03-08 10:48:22 -0800130 this.replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700131
132 for (int i = 0; i < NUM_BUCKETS; i++) {
133 flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
134 }
135
136 getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
137 getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
138 backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700139 getFlowsSubject = new MessageSubject(String.format("flow-store-%s-flows", deviceId));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700140
Jordan Haltermane3de3212019-03-08 10:48:22 -0800141 addListeners();
142
Jordan Halterman281dbf32018-06-15 17:46:28 -0700143 setBackupPeriod(backupPeriod);
144 setAntiEntropyPeriod(antiEntropyPeriod);
145 registerSubscribers();
146
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800147 scheduleBackups();
Jordan Haltermane3de3212019-03-08 10:48:22 -0800148
149 activateMaster(replicaInfo);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700150 }
151
152 /**
153 * Sets the flow table backup period.
154 *
155 * @param backupPeriod the flow table backup period in milliseconds
156 */
157 synchronized void setBackupPeriod(long backupPeriod) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800158 this.backupPeriod = backupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700159 }
160
161 /**
162 * Sets the flow table anti-entropy period.
163 *
164 * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
165 */
166 synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
167 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
168 if (antiEntropyFuture != null) {
169 antiEntropyFuture.cancel(false);
170 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800171 this.antiEntropyFuture = scheduler.scheduleAtFixedRate(
172 () -> executor.execute(this::runAntiEntropy),
173 antiEntropyPeriod,
174 antiEntropyPeriod,
175 TimeUnit.MILLISECONDS);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700176 }
177
178 /**
179 * Counts the flows in the table.
180 *
181 * @return the total number of flows in the table
182 */
183 public int count() {
184 return flowBuckets.values().stream()
185 .mapToInt(FlowBucket::count)
186 .sum();
187 }
188
189 /**
190 * Returns the flow entry for the given rule.
191 *
192 * @param rule the rule for which to lookup the flow entry
193 * @return the flow entry for the given rule
194 */
195 public StoredFlowEntry getFlowEntry(FlowRule rule) {
196 return getBucket(rule.id())
197 .getFlowEntries(rule.id())
198 .get(rule);
199 }
200
201 /**
202 * Returns the set of flow entries in the table.
203 *
204 * @return the set of flow entries in the table
205 */
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700206 public CompletableFuture<Iterable<FlowEntry>> getFlowEntries() {
207 // Fetch the entries for each bucket in parallel and then concatenate the sets
208 // to create a single iterable.
209 return Tools.allOf(flowBuckets.values()
210 .stream()
211 .map(this::getFlowEntries)
212 .collect(Collectors.toList()))
213 .thenApply(Iterables::concat);
214 }
215
216 /**
217 * Fetches the set of flow entries in the given bucket.
218 *
219 * @param bucketId the bucket for which to fetch flow entries
220 * @return a future to be completed once the flow entries have been retrieved
221 */
222 private CompletableFuture<Set<FlowEntry>> getFlowEntries(BucketId bucketId) {
223 return getFlowEntries(getBucket(bucketId.bucket()));
224 }
225
226 /**
227 * Fetches the set of flow entries in the given bucket.
228 *
229 * @param bucket the bucket for which to fetch flow entries
230 * @return a future to be completed once the flow entries have been retrieved
231 */
232 private CompletableFuture<Set<FlowEntry>> getFlowEntries(FlowBucket bucket) {
233 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
234
235 // If the local node is the master, fetch the entries locally. Otherwise, request the entries
236 // from the current master. Note that there's a change of a brief cycle during a mastership change.
237 if (replicaInfo.isMaster(localNodeId)) {
238 return CompletableFuture.completedFuture(
239 bucket.getFlowBucket().values().stream()
240 .flatMap(entries -> entries.values().stream())
241 .collect(Collectors.toSet()));
242 } else if (replicaInfo.master() != null) {
243 return clusterCommunicator.sendAndReceive(
244 bucket.bucketId(),
245 getFlowsSubject,
246 SERIALIZER::encode,
247 SERIALIZER::decode,
Jordan Halterman43300782019-05-21 11:27:50 -0700248 replicaInfo.master(),
249 Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700250 } else {
251 return CompletableFuture.completedFuture(Collections.emptySet());
252 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700253 }
254
255 /**
256 * Returns the bucket for the given flow identifier.
257 *
258 * @param flowId the flow identifier
259 * @return the bucket for the given flow identifier
260 */
261 private FlowBucket getBucket(FlowId flowId) {
262 return getBucket(bucket(flowId));
263 }
264
265 /**
266 * Returns the bucket with the given identifier.
267 *
268 * @param bucketId the bucket identifier
269 * @return the bucket with the given identifier
270 */
271 private FlowBucket getBucket(int bucketId) {
272 return flowBuckets.get(bucketId);
273 }
274
275 /**
276 * Returns the bucket number for the given flow identifier.
277 *
278 * @param flowId the flow identifier
279 * @return the bucket number for the given flow identifier
280 */
281 private int bucket(FlowId flowId) {
282 return Math.abs((int) (flowId.id() % NUM_BUCKETS));
283 }
284
285 /**
286 * Returns the digests for all buckets in the flow table for the device.
287 *
288 * @return the set of digests for all buckets for the device
289 */
290 private Set<FlowBucketDigest> getDigests() {
291 return flowBuckets.values()
292 .stream()
293 .map(bucket -> bucket.getDigest())
294 .collect(Collectors.toSet());
295 }
296
297 /**
298 * Returns the digest for the given bucket.
299 *
300 * @param bucket the bucket for which to return the digest
301 * @return the digest for the given bucket
302 */
303 private FlowBucketDigest getDigest(int bucket) {
304 return flowBuckets.get(bucket).getDigest();
305 }
306
307 /**
308 * Adds an entry to the table.
309 *
310 * @param rule the rule to add
311 * @return a future to be completed once the rule has been added
312 */
313 public CompletableFuture<Void> add(FlowEntry rule) {
314 return runInTerm(rule.id(), (bucket, term) -> {
315 bucket.add(rule, term, clock);
316 return null;
317 });
318 }
319
320 /**
321 * Updates an entry in the table.
322 *
323 * @param rule the rule to update
324 * @return a future to be completed once the rule has been updated
325 */
326 public CompletableFuture<Void> update(FlowEntry rule) {
327 return runInTerm(rule.id(), (bucket, term) -> {
328 bucket.update(rule, term, clock);
329 return null;
330 });
331 }
332
333 /**
334 * Applies the given update function to the rule.
335 *
336 * @param rule the rule to update
337 * @param function the update function to apply
338 * @param <T> the result type
339 * @return a future to be completed with the update result or {@code null} if the rule was not updated
340 */
341 public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
342 return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
343 }
344
345 /**
346 * Removes an entry from the table.
347 *
348 * @param rule the rule to remove
349 * @return a future to be completed once the rule has been removed
350 */
351 public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
352 return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
353 }
354
355 /**
356 * Runs the given function in the current term.
357 *
358 * @param flowId the flow identifier indicating the bucket in which to run the function
359 * @param function the function to execute in the current term
360 * @param <T> the future result type
361 * @return a future to be completed with the function result once it has been run
362 */
363 private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
Jordan Halterman07093b02019-03-11 13:30:12 -0700364 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
365 if (!replicaInfo.isMaster(localNodeId)) {
366 return Tools.exceptionalFuture(new IllegalStateException());
367 }
368
369 FlowBucket bucket = getBucket(flowId);
370
371 // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
372 // the change to be executed once the master has been synchronized.
373 final long term = replicaInfo.term();
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800374 CompletableFuture<T> future = new CompletableFuture<>();
Jordan Halterman07093b02019-03-11 13:30:12 -0700375 if (activeTerm < term) {
376 log.debug("Enqueueing operation for device {}", deviceId);
377 flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
378 .add(() -> future.complete(apply(function, bucket, term)));
379 } else {
380 future.complete(apply(function, bucket, term));
381 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800382 return future;
383 }
384
385 /**
Jordan Halterman07093b02019-03-11 13:30:12 -0700386 * Applies the given function to the given bucket.
387 *
388 * @param function the function to apply
389 * @param bucket the bucket to which to apply the function
390 * @param term the term in which to apply the function
391 * @param <T> the expected result type
392 * @return the function result
393 */
394 private <T> T apply(BiFunction<FlowBucket, Long, T> function, FlowBucket bucket, long term) {
395 synchronized (bucket) {
396 return function.apply(bucket, term);
397 }
398 }
399
400 /**
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800401 * Schedules bucket backups.
402 */
403 private void scheduleBackups() {
404 flowBuckets.values().forEach(bucket -> backupBucket(bucket).whenComplete((result, error) -> {
405 scheduleBackup(bucket);
406 }));
407 }
408
409 /**
410 * Schedules a backup for the given bucket.
411 *
412 * @param bucket the bucket for which to schedule the backup
413 */
414 private void scheduleBackup(FlowBucket bucket) {
415 scheduler.schedule(
416 () -> executor.execute(() -> backupBucket(bucket)),
417 backupPeriod,
418 TimeUnit.MILLISECONDS);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700419 }
420
421 /**
422 * Backs up all buckets in the given device to the given node.
423 */
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800424 private CompletableFuture<Void> backupAll() {
425 CompletableFuture<?>[] futures = flowBuckets.values()
426 .stream()
427 .map(bucket -> backupBucket(bucket))
428 .toArray(CompletableFuture[]::new);
429 return CompletableFuture.allOf(futures);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700430 }
431
432 /**
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800433 * Backs up the given flow bucket.
Jordan Halterman281dbf32018-06-15 17:46:28 -0700434 *
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800435 * @param bucket the flow bucket to backup
Jordan Halterman281dbf32018-06-15 17:46:28 -0700436 */
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800437 private CompletableFuture<Void> backupBucket(FlowBucket bucket) {
438 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700439
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800440 // Only replicate if the bucket's term matches the replica term and the local node is the current master.
441 // This ensures that the bucket has been synchronized prior to a new master replicating changes to backups.
442 // Only replicate if the local node is the current master.
443 if (bucket.term() == replicaInfo.term() && replicaInfo.isMaster(localNodeId)) {
444 // Replicate the bucket to each of the backup nodes.
445 CompletableFuture<?>[] futures = replicaInfo.backups()
446 .stream()
447 .map(nodeId -> backupBucketToNode(bucket, nodeId))
448 .toArray(CompletableFuture[]::new);
449 return CompletableFuture.allOf(futures);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700450 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800451 return CompletableFuture.completedFuture(null);
452 }
453
454 /**
455 * Backs up the given flow bucket to the given node.
456 *
457 * @param bucket the bucket to backup
458 * @param nodeId the node to which to back up the bucket
459 * @return a future to be completed once the bucket has been backed up
460 */
461 private CompletableFuture<Void> backupBucketToNode(FlowBucket bucket, NodeId nodeId) {
462 // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
463 LogicalTimestamp timestamp = bucket.timestamp();
464
465 // If the backup can be run (no concurrent backup to the node in progress) then run it.
466 BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
467 if (startBackup(operation, timestamp)) {
468 CompletableFuture<Void> future = new CompletableFuture<>();
469 backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
470 if (error != null) {
471 log.debug("Backup operation {} failed", operation, error);
472 failBackup(operation);
473 } else if (succeeded) {
474 succeedBackup(operation, timestamp);
475 } else {
476 log.debug("Backup operation {} failed: term mismatch", operation);
477 failBackup(operation);
478 }
479 future.complete(null);
480 }, executor);
481 return future;
482 }
483 return CompletableFuture.completedFuture(null);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700484 }
485
486 /**
487 * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
488 * <p>
489 * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
490 * are pending replication for the backup operation.
491 *
492 * @param operation the operation to start
493 * @param timestamp the timestamp for which to start the backup operation
494 * @return indicates whether the given backup operation should be started
495 */
496 private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
497 LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
498 return timestamp != null
499 && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
500 && inFlightUpdates.add(operation);
501 }
502
503 /**
504 * Fails the given backup operation.
505 *
506 * @param operation the backup operation to fail
507 */
508 private void failBackup(BackupOperation operation) {
509 inFlightUpdates.remove(operation);
510 }
511
512 /**
513 * Succeeds the given backup operation.
514 * <p>
515 * The last backup time for the operation will be updated and the operation will be removed from
516 * in-flight updates.
517 *
518 * @param operation the operation to succeed
519 * @param timestamp the timestamp at which the operation was <em>started</em>
520 */
521 private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
522 lastBackupTimes.put(operation, timestamp);
523 inFlightUpdates.remove(operation);
524 }
525
526 /**
527 * Resets the last completion time for the given backup operation to ensure it's replicated again.
528 *
529 * @param operation the backup operation to reset
530 */
531 private void resetBackup(BackupOperation operation) {
532 lastBackupTimes.remove(operation);
533 }
534
535 /**
536 * Performs the given backup operation.
537 *
538 * @param bucket the bucket to backup
539 * @param nodeId the node to which to backup the bucket
540 * @return a future to be completed with a boolean indicating whether the backup operation was successful
541 */
542 private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
543 if (log.isDebugEnabled()) {
544 log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
545 }
Jordan Halterman07093b02019-03-11 13:30:12 -0700546 synchronized (bucket) {
547 return sendWithTimestamp(bucket, backupSubject, nodeId);
548 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700549 }
550
551 /**
552 * Handles a flow bucket backup from a remote peer.
553 *
554 * @param flowBucket the flow bucket to back up
555 * @return the set of flows that could not be backed up
556 */
557 private boolean onBackup(FlowBucket flowBucket) {
558 if (log.isDebugEnabled()) {
559 log.debug("{} - Received {} flow entries in bucket {} to backup",
560 deviceId, flowBucket.count(), flowBucket.bucketId());
561 }
562
563 try {
564 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
565
566 // If the backup is for a different term, reject the request until we learn about the new term.
567 if (flowBucket.term() != replicaInfo.term()) {
568 log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
569 return false;
570 }
571
572 flowBuckets.compute(flowBucket.bucketId().bucket(),
573 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
574 return true;
575 } catch (Exception e) {
576 log.warn("Failure processing backup request", e);
577 return false;
578 }
579 }
580
581 /**
582 * Runs the anti-entropy protocol.
583 */
584 private void runAntiEntropy() {
585 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
586 if (!replicaInfo.isMaster(localNodeId)) {
587 return;
588 }
589
590 for (NodeId nodeId : replicaInfo.backups()) {
591 runAntiEntropy(nodeId);
592 }
593 }
594
595 /**
596 * Runs the anti-entropy protocol against the given peer.
597 *
598 * @param nodeId the node with which to execute the anti-entropy protocol
599 */
600 private void runAntiEntropy(NodeId nodeId) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800601 backupAll().whenCompleteAsync((result, error) -> {
602 requestDigests(nodeId).thenAcceptAsync((digests) -> {
603 // Compute a set of missing BucketIds based on digest times and send them back to the master.
604 for (FlowBucketDigest remoteDigest : digests) {
605 FlowBucket localBucket = getBucket(remoteDigest.bucket());
606 if (localBucket.getDigest().isNewerThan(remoteDigest)) {
607 log.debug("Detected missing flow entries on node {} in bucket {}/{}",
608 nodeId, deviceId, remoteDigest.bucket());
609 resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
610 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700611 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800612 }, executor);
613 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700614 }
615
616 /**
617 * Sends a digest request to the given node.
618 *
619 * @param nodeId the node to which to send the request
620 * @return future to be completed with the set of digests for the given device on the given node
621 */
622 private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
623 return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
624 }
625
626 /**
627 * Synchronizes flows from the previous master or backups.
628 *
629 * @param prevReplicaInfo the previous replica info
630 * @param newReplicaInfo the new replica info
631 */
632 private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
633 if (prevReplicaInfo == null) {
634 activateMaster(newReplicaInfo);
635 } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
636 syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
637 } else {
638 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
639 }
640 }
641
642 /**
643 * Synchronizes flows from the previous master, falling back to backups if the master fails.
644 *
645 * @param prevReplicaInfo the previous replica info
646 * @param newReplicaInfo the new replica info
647 */
648 private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
649 syncFlowsOn(prevReplicaInfo.master())
650 .whenCompleteAsync((result, error) -> {
651 if (error != null) {
652 log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
653 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
654 } else {
655 activateMaster(newReplicaInfo);
656 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800657 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700658 }
659
660 /**
661 * Synchronizes flows from the previous backups.
662 *
663 * @param prevReplicaInfo the previous replica info
664 * @param newReplicaInfo the new replica info
665 */
666 private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
667 List<NodeId> backups = prevReplicaInfo.backups()
668 .stream()
669 .filter(nodeId -> !nodeId.equals(localNodeId))
670 .collect(Collectors.toList());
671 syncFlowsOn(backups)
672 .whenCompleteAsync((result, error) -> {
673 if (error != null) {
674 log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
675 }
676 activateMaster(newReplicaInfo);
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800677 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700678 }
679
680 /**
681 * Synchronizes flows for the device on the given nodes.
682 *
683 * @param nodes the nodes via which to synchronize the flows
684 * @return a future to be completed once flows have been synchronizes
685 */
686 private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
687 return nodes.isEmpty()
688 ? CompletableFuture.completedFuture(null)
689 : Tools.firstOf(nodes.stream()
690 .map(node -> syncFlowsOn(node))
691 .collect(Collectors.toList()))
692 .thenApply(v -> null);
693 }
694
695 /**
696 * Synchronizes flows for the device from the given node.
697 *
698 * @param nodeId the node from which to synchronize flows
699 * @return a future to be completed once the flows have been synchronizes
700 */
701 private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
702 return requestDigests(nodeId)
703 .thenCompose(digests -> Tools.allOf(digests.stream()
704 .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
705 .map(digest -> syncBucketOn(nodeId, digest.bucket()))
706 .collect(Collectors.toList())))
707 .thenApply(v -> null);
708 }
709
710 /**
711 * Synchronizes the given bucket on the given node.
712 *
713 * @param nodeId the node on which to synchronize the bucket
714 * @param bucketNumber the bucket to synchronize
715 * @return a future to be completed once the bucket has been synchronizes
716 */
717 private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
718 return requestBucket(nodeId, bucketNumber)
719 .thenAcceptAsync(flowBucket -> {
720 flowBuckets.compute(flowBucket.bucketId().bucket(),
721 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800722 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700723 }
724
725 /**
726 * Requests the given bucket from the given node.
727 *
728 * @param nodeId the node from which to request the bucket
729 * @param bucket the bucket to request
730 * @return a future to be completed with the bucket
731 */
732 private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
733 log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
734 return sendWithTimestamp(bucket, getBucketSubject, nodeId);
735 }
736
737 /**
738 * Handles a flow bucket request.
739 *
Jordan Halterman158feb92018-06-30 23:44:30 -0700740 * @param bucketId the bucket number
Jordan Halterman281dbf32018-06-15 17:46:28 -0700741 * @return the flow bucket
742 */
Jordan Halterman158feb92018-06-30 23:44:30 -0700743 private FlowBucket onGetBucket(int bucketId) {
744 return flowBuckets.get(bucketId).copy();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700745 }
746
747 /**
748 * Activates the new master term.
749 *
750 * @param replicaInfo the new replica info
751 */
752 private void activateMaster(DeviceReplicaInfo replicaInfo) {
Jordan Haltermane3de3212019-03-08 10:48:22 -0800753 if (replicaInfo.isMaster(localNodeId)) {
754 log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
755 for (int i = 0; i < NUM_BUCKETS; i++) {
756 activateBucket(i);
757 }
758 lifecycleManager.activate(replicaInfo.term());
759 activeTerm = replicaInfo.term();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700760 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700761 }
762
763 /**
764 * Activates the given bucket number.
765 *
766 * @param bucket the bucket number to activate
767 */
768 private void activateBucket(int bucket) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800769 Queue<Runnable> tasks = flowTasks.remove(bucket);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700770 if (tasks != null) {
771 log.debug("Completing enqueued operations for device {}", deviceId);
772 tasks.forEach(task -> task.run());
773 }
774 }
775
776 /**
777 * Handles a lifecycle event.
778 */
779 private void onLifecycleEvent(LifecycleEvent event) {
780 log.debug("Received lifecycle event for device {}: {}", deviceId, event);
781 switch (event.type()) {
782 case TERM_START:
783 startTerm(event.subject());
784 break;
785 case TERM_ACTIVE:
786 activateTerm(event.subject());
787 break;
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700788 case TERM_UPDATE:
789 updateTerm(event.subject());
790 break;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700791 default:
792 break;
793 }
794 }
795
796 /**
797 * Handles a replica change at the start of a new term.
798 */
799 private void startTerm(DeviceReplicaInfo replicaInfo) {
800 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
801 this.replicaInfo = replicaInfo;
802 if (replicaInfo.isMaster(localNodeId)) {
803 log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
804 syncFlows(oldReplicaInfo, replicaInfo);
805 }
806 }
807
808 /**
809 * Handles the activation of a term.
810 */
811 private void activateTerm(DeviceReplicaInfo replicaInfo) {
812 if (replicaInfo.term() < this.replicaInfo.term()) {
813 return;
814 }
815 if (replicaInfo.term() > this.replicaInfo.term()) {
816 this.replicaInfo = replicaInfo;
817 }
818
819 // If the local node is neither the master or a backup for the device, clear the flow table.
820 if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
821 flowBuckets.values().forEach(bucket -> bucket.clear());
822 }
823 activeTerm = replicaInfo.term();
824 }
825
826 /**
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700827 * Handles an update to a term.
828 */
829 private void updateTerm(DeviceReplicaInfo replicaInfo) {
Jordan Halterman0677d882019-03-07 15:52:34 -0800830 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
831 if (oldReplicaInfo != null && replicaInfo.term() == oldReplicaInfo.term()) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700832 this.replicaInfo = replicaInfo;
833
834 // If the local node is neither the master or a backup for the device *and the term is active*,
835 // clear the flow table.
836 if (activeTerm == replicaInfo.term()
837 && !replicaInfo.isMaster(localNodeId)
838 && !replicaInfo.isBackup(localNodeId)) {
839 flowBuckets.values().forEach(bucket -> bucket.clear());
840 }
841 }
842 }
843
844 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700845 * Sends a message to the given node wrapped in a Lamport timestamp.
846 * <p>
847 * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
848 * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
849 *
850 * @param message the message to send
851 * @param subject the message subject
852 * @param toNodeId the node to which to send the message
853 * @param <M> the message type
854 * @param <R> the response type
855 * @return a future to be completed with the response
856 */
857 private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
858 return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
859 clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
860 .thenApply(response -> {
861 clock.tick(response.timestamp());
862 return response.value();
863 });
864 }
865
866 /**
867 * Receives messages to the given subject wrapped in Lamport timestamps.
868 * <p>
869 * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
870 * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
871 *
872 * @param subject the subject for which to register the subscriber
873 * @param function the raw message handler
874 * @param <M> the raw message type
875 * @param <R> the raw response type
876 */
877 private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
878 clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
879 clock.tick(request.timestamp());
880 return clock.timestamp(function.apply(request.value()));
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800881 }, SERIALIZER::encode, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700882 }
883
884 /**
885 * Registers internal message subscribers.
886 */
887 private void registerSubscribers() {
888 receiveWithTimestamp(getDigestsSubject, v -> getDigests());
889 receiveWithTimestamp(getBucketSubject, this::onGetBucket);
890 receiveWithTimestamp(backupSubject, this::onBackup);
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700891 clusterCommunicator.<BucketId, Set<FlowEntry>>addSubscriber(
892 getFlowsSubject, SERIALIZER::decode, this::getFlowEntries, SERIALIZER::encode);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700893 }
894
895 /**
896 * Unregisters internal message subscribers.
897 */
898 private void unregisterSubscribers() {
899 clusterCommunicator.removeSubscriber(getDigestsSubject);
900 clusterCommunicator.removeSubscriber(getBucketSubject);
901 clusterCommunicator.removeSubscriber(backupSubject);
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700902 clusterCommunicator.removeSubscriber(getFlowsSubject);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700903 }
904
905 /**
906 * Adds internal event listeners.
907 */
908 private void addListeners() {
909 lifecycleManager.addListener(lifecycleEventListener);
910 }
911
912 /**
913 * Removes internal event listeners.
914 */
915 private void removeListeners() {
916 lifecycleManager.removeListener(lifecycleEventListener);
917 }
918
919 /**
920 * Cancels recurrent scheduled futures.
921 */
922 private synchronized void cancelFutures() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700923 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
924 if (antiEntropyFuture != null) {
925 antiEntropyFuture.cancel(false);
926 }
927 }
928
929 /**
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700930 * Purges the flow table.
931 */
932 public void purge() {
933 flowTasks.clear();
934 flowBuckets.values().forEach(bucket -> bucket.purge());
935 lastBackupTimes.clear();
936 inFlightUpdates.clear();
937 }
938
939 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700940 * Closes the device flow table.
941 */
942 public void close() {
943 removeListeners();
944 unregisterSubscribers();
945 cancelFutures();
946 lifecycleManager.close();
947 }
948}