blob: 6bc5b5f49d0d86e7999fbd58e10aaa51f49720cc [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import java.util.Collection;
19import java.util.LinkedList;
20import java.util.List;
21import java.util.Map;
22import java.util.Queue;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
25import java.util.concurrent.ScheduledExecutorService;
26import java.util.concurrent.ScheduledFuture;
27import java.util.concurrent.TimeUnit;
28import java.util.function.BiFunction;
29import java.util.function.Function;
30import java.util.stream.Collectors;
31
32import com.google.common.collect.Maps;
33import com.google.common.collect.Sets;
34import org.onlab.util.KryoNamespace;
35import org.onlab.util.Tools;
36import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.net.DeviceId;
39import org.onosproject.net.flow.FlowEntry;
40import org.onosproject.net.flow.FlowId;
41import org.onosproject.net.flow.FlowRule;
42import org.onosproject.net.flow.StoredFlowEntry;
43import org.onosproject.store.LogicalTimestamp;
44import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
45import org.onosproject.store.cluster.messaging.MessageSubject;
46import org.onosproject.store.serializers.KryoNamespaces;
47import org.onosproject.store.service.Serializer;
48import org.slf4j.Logger;
49import org.slf4j.LoggerFactory;
50
51/**
52 * Flow table for all flows associated with a specific device.
53 * <p>
54 * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
55 * table performs communication independent of other device flow tables for more parallelism.
56 * <p>
57 * This implementation uses several different replication protocols. Changes that occur on the device master are
58 * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
59 * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
60 * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
61 * device, allowing mastership to be reassigned to non-backup nodes.
62 */
63public class DeviceFlowTable {
64 private static final int NUM_BUCKETS = 1024;
65 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
66 .register(KryoNamespaces.API)
67 .register(BucketId.class)
68 .register(FlowBucket.class)
69 .register(FlowBucketDigest.class)
70 .register(LogicalTimestamp.class)
71 .register(Timestamped.class)
72 .build());
73
74 private final Logger log = LoggerFactory.getLogger(getClass());
75
76 private final MessageSubject getDigestsSubject;
77 private final MessageSubject getBucketSubject;
78 private final MessageSubject backupSubject;
79
80 private final DeviceId deviceId;
81 private final ClusterCommunicationService clusterCommunicator;
82 private final LifecycleManager lifecycleManager;
83 private final ScheduledExecutorService executorService;
84 private final NodeId localNodeId;
85
86 private final LogicalClock clock = new LogicalClock();
87
88 private volatile DeviceReplicaInfo replicaInfo;
89 private volatile long activeTerm;
90
91 private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
92 @Override
93 public void event(LifecycleEvent event) {
94 executorService.execute(() -> onLifecycleEvent(event));
95 }
96 };
97
98 private ScheduledFuture<?> backupFuture;
99 private ScheduledFuture<?> antiEntropyFuture;
100
101 private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
102 private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
103
104 private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
105 private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
106
107 DeviceFlowTable(
108 DeviceId deviceId,
109 ClusterService clusterService,
110 ClusterCommunicationService clusterCommunicator,
111 LifecycleManager lifecycleManager,
112 ScheduledExecutorService executorService,
113 long backupPeriod,
114 long antiEntropyPeriod) {
115 this.deviceId = deviceId;
116 this.clusterCommunicator = clusterCommunicator;
117 this.lifecycleManager = lifecycleManager;
118 this.executorService = executorService;
119 this.localNodeId = clusterService.getLocalNode().id();
120
121 addListeners();
122
123 for (int i = 0; i < NUM_BUCKETS; i++) {
124 flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
125 }
126
127 getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
128 getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
129 backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
130
131 setBackupPeriod(backupPeriod);
132 setAntiEntropyPeriod(antiEntropyPeriod);
133 registerSubscribers();
134
135 startTerm(lifecycleManager.getReplicaInfo());
136 }
137
138 /**
139 * Sets the flow table backup period.
140 *
141 * @param backupPeriod the flow table backup period in milliseconds
142 */
143 synchronized void setBackupPeriod(long backupPeriod) {
144 ScheduledFuture<?> backupFuture = this.backupFuture;
145 if (backupFuture != null) {
146 backupFuture.cancel(false);
147 }
148 this.backupFuture = executorService.scheduleAtFixedRate(
149 this::backup, backupPeriod, backupPeriod, TimeUnit.MILLISECONDS);
150 }
151
152 /**
153 * Sets the flow table anti-entropy period.
154 *
155 * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
156 */
157 synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
158 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
159 if (antiEntropyFuture != null) {
160 antiEntropyFuture.cancel(false);
161 }
162 this.antiEntropyFuture = executorService.scheduleAtFixedRate(
163 this::runAntiEntropy, antiEntropyPeriod, antiEntropyPeriod, TimeUnit.MILLISECONDS);
164 }
165
166 /**
167 * Counts the flows in the table.
168 *
169 * @return the total number of flows in the table
170 */
171 public int count() {
172 return flowBuckets.values().stream()
173 .mapToInt(FlowBucket::count)
174 .sum();
175 }
176
177 /**
178 * Returns the flow entry for the given rule.
179 *
180 * @param rule the rule for which to lookup the flow entry
181 * @return the flow entry for the given rule
182 */
183 public StoredFlowEntry getFlowEntry(FlowRule rule) {
184 return getBucket(rule.id())
185 .getFlowEntries(rule.id())
186 .get(rule);
187 }
188
189 /**
190 * Returns the set of flow entries in the table.
191 *
192 * @return the set of flow entries in the table
193 */
194 public Set<FlowEntry> getFlowEntries() {
195 return flowBuckets.values().stream()
196 .flatMap(bucket -> bucket.getFlowBucket().values().stream())
197 .flatMap(entries -> entries.values().stream())
198 .collect(Collectors.toSet());
199 }
200
201 /**
202 * Returns the bucket for the given flow identifier.
203 *
204 * @param flowId the flow identifier
205 * @return the bucket for the given flow identifier
206 */
207 private FlowBucket getBucket(FlowId flowId) {
208 return getBucket(bucket(flowId));
209 }
210
211 /**
212 * Returns the bucket with the given identifier.
213 *
214 * @param bucketId the bucket identifier
215 * @return the bucket with the given identifier
216 */
217 private FlowBucket getBucket(int bucketId) {
218 return flowBuckets.get(bucketId);
219 }
220
221 /**
222 * Returns the bucket number for the given flow identifier.
223 *
224 * @param flowId the flow identifier
225 * @return the bucket number for the given flow identifier
226 */
227 private int bucket(FlowId flowId) {
228 return Math.abs((int) (flowId.id() % NUM_BUCKETS));
229 }
230
231 /**
232 * Returns the digests for all buckets in the flow table for the device.
233 *
234 * @return the set of digests for all buckets for the device
235 */
236 private Set<FlowBucketDigest> getDigests() {
237 return flowBuckets.values()
238 .stream()
239 .map(bucket -> bucket.getDigest())
240 .collect(Collectors.toSet());
241 }
242
243 /**
244 * Returns the digest for the given bucket.
245 *
246 * @param bucket the bucket for which to return the digest
247 * @return the digest for the given bucket
248 */
249 private FlowBucketDigest getDigest(int bucket) {
250 return flowBuckets.get(bucket).getDigest();
251 }
252
253 /**
254 * Adds an entry to the table.
255 *
256 * @param rule the rule to add
257 * @return a future to be completed once the rule has been added
258 */
259 public CompletableFuture<Void> add(FlowEntry rule) {
260 return runInTerm(rule.id(), (bucket, term) -> {
261 bucket.add(rule, term, clock);
262 return null;
263 });
264 }
265
266 /**
267 * Updates an entry in the table.
268 *
269 * @param rule the rule to update
270 * @return a future to be completed once the rule has been updated
271 */
272 public CompletableFuture<Void> update(FlowEntry rule) {
273 return runInTerm(rule.id(), (bucket, term) -> {
274 bucket.update(rule, term, clock);
275 return null;
276 });
277 }
278
279 /**
280 * Applies the given update function to the rule.
281 *
282 * @param rule the rule to update
283 * @param function the update function to apply
284 * @param <T> the result type
285 * @return a future to be completed with the update result or {@code null} if the rule was not updated
286 */
287 public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
288 return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
289 }
290
291 /**
292 * Removes an entry from the table.
293 *
294 * @param rule the rule to remove
295 * @return a future to be completed once the rule has been removed
296 */
297 public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
298 return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
299 }
300
301 /**
302 * Runs the given function in the current term.
303 *
304 * @param flowId the flow identifier indicating the bucket in which to run the function
305 * @param function the function to execute in the current term
306 * @param <T> the future result type
307 * @return a future to be completed with the function result once it has been run
308 */
309 private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
310 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
311 if (!replicaInfo.isMaster(localNodeId)) {
312 return Tools.exceptionalFuture(new IllegalStateException());
313 }
314
315 FlowBucket bucket = getBucket(flowId);
316
317 // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
318 // the change to be executed once the master has been synchronized.
319 final long term = replicaInfo.term();
320 if (activeTerm < term) {
321 log.debug("Enqueueing operation for device {}", deviceId);
322 synchronized (flowTasks) {
323 // Double checked lock on the active term.
324 if (activeTerm < term) {
325 CompletableFuture<T> future = new CompletableFuture<>();
326 flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
327 .add(() -> future.complete(function.apply(bucket, term)));
328 return future;
329 }
330 }
331 }
332 return CompletableFuture.completedFuture(function.apply(bucket, term));
333 }
334
335 /**
336 * Backs up all buckets in the given device to the given node.
337 */
338 private void backup() {
339 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
340
341 // If the local node is not currently the master, skip the backup.
342 if (!replicaInfo.isMaster(localNodeId)) {
343 return;
344 }
345
346 // Otherwise, iterate through backup nodes and backup the device.
347 for (NodeId nodeId : replicaInfo.backups()) {
348 try {
349 backup(nodeId, replicaInfo.term());
350 } catch (Exception e) {
351 log.error("Backup of " + deviceId + " to " + nodeId + " failed", e);
352 }
353 }
354 }
355
356 /**
357 * Backs up all buckets for the device to the given node.
358 *
359 * @param nodeId the node to which to back up the device
360 * @param term the term for which to backup to the node
361 */
362 private void backup(NodeId nodeId, long term) {
363 for (FlowBucket bucket : flowBuckets.values()) {
364 // If the bucket is not in the current term, skip it. This forces synchronization of the bucket
365 // to occur prior to the new master replicating changes in the bucket to backups.
366 if (bucket.term() != term) {
367 continue;
368 }
369
370 // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
371 LogicalTimestamp timestamp = bucket.timestamp();
372
373 // If the backup can be run (no concurrent backup to the node in progress) then run it.
374 BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
375 if (startBackup(operation, timestamp)) {
376 backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
377 if (error != null) {
378 log.debug("Backup operation {} failed", operation, error);
379 failBackup(operation);
380 } else if (succeeded) {
381 succeedBackup(operation, timestamp);
382 backup(nodeId, term);
383 } else {
384 log.debug("Backup operation {} failed: term mismatch", operation);
385 failBackup(operation);
386 }
387 }, executorService);
388 }
389 }
390 }
391
392 /**
393 * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
394 * <p>
395 * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
396 * are pending replication for the backup operation.
397 *
398 * @param operation the operation to start
399 * @param timestamp the timestamp for which to start the backup operation
400 * @return indicates whether the given backup operation should be started
401 */
402 private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
403 LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
404 return timestamp != null
405 && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
406 && inFlightUpdates.add(operation);
407 }
408
409 /**
410 * Fails the given backup operation.
411 *
412 * @param operation the backup operation to fail
413 */
414 private void failBackup(BackupOperation operation) {
415 inFlightUpdates.remove(operation);
416 }
417
418 /**
419 * Succeeds the given backup operation.
420 * <p>
421 * The last backup time for the operation will be updated and the operation will be removed from
422 * in-flight updates.
423 *
424 * @param operation the operation to succeed
425 * @param timestamp the timestamp at which the operation was <em>started</em>
426 */
427 private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
428 lastBackupTimes.put(operation, timestamp);
429 inFlightUpdates.remove(operation);
430 }
431
432 /**
433 * Resets the last completion time for the given backup operation to ensure it's replicated again.
434 *
435 * @param operation the backup operation to reset
436 */
437 private void resetBackup(BackupOperation operation) {
438 lastBackupTimes.remove(operation);
439 }
440
441 /**
442 * Performs the given backup operation.
443 *
444 * @param bucket the bucket to backup
445 * @param nodeId the node to which to backup the bucket
446 * @return a future to be completed with a boolean indicating whether the backup operation was successful
447 */
448 private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
449 if (log.isDebugEnabled()) {
450 log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
451 }
452 return sendWithTimestamp(bucket, backupSubject, nodeId);
453 }
454
455 /**
456 * Handles a flow bucket backup from a remote peer.
457 *
458 * @param flowBucket the flow bucket to back up
459 * @return the set of flows that could not be backed up
460 */
461 private boolean onBackup(FlowBucket flowBucket) {
462 if (log.isDebugEnabled()) {
463 log.debug("{} - Received {} flow entries in bucket {} to backup",
464 deviceId, flowBucket.count(), flowBucket.bucketId());
465 }
466
467 try {
468 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
469
470 // If the backup is for a different term, reject the request until we learn about the new term.
471 if (flowBucket.term() != replicaInfo.term()) {
472 log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
473 return false;
474 }
475
476 flowBuckets.compute(flowBucket.bucketId().bucket(),
477 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
478 return true;
479 } catch (Exception e) {
480 log.warn("Failure processing backup request", e);
481 return false;
482 }
483 }
484
485 /**
486 * Runs the anti-entropy protocol.
487 */
488 private void runAntiEntropy() {
489 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
490 if (!replicaInfo.isMaster(localNodeId)) {
491 return;
492 }
493
494 for (NodeId nodeId : replicaInfo.backups()) {
495 runAntiEntropy(nodeId);
496 }
497 }
498
499 /**
500 * Runs the anti-entropy protocol against the given peer.
501 *
502 * @param nodeId the node with which to execute the anti-entropy protocol
503 */
504 private void runAntiEntropy(NodeId nodeId) {
505 requestDigests(nodeId).thenAcceptAsync((digests) -> {
506 // Compute a set of missing BucketIds based on digest times and send them back to the master.
507 for (FlowBucketDigest remoteDigest : digests) {
508 FlowBucket localBucket = getBucket(remoteDigest.bucket());
509 if (localBucket.getDigest().isNewerThan(remoteDigest)) {
510 log.debug("Detected missing flow entries on node {} in bucket {}/{}",
511 nodeId, deviceId, remoteDigest.bucket());
512 resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
513 }
514 }
515 }, executorService);
516 }
517
518 /**
519 * Sends a digest request to the given node.
520 *
521 * @param nodeId the node to which to send the request
522 * @return future to be completed with the set of digests for the given device on the given node
523 */
524 private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
525 return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
526 }
527
528 /**
529 * Synchronizes flows from the previous master or backups.
530 *
531 * @param prevReplicaInfo the previous replica info
532 * @param newReplicaInfo the new replica info
533 */
534 private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
535 if (prevReplicaInfo == null) {
536 activateMaster(newReplicaInfo);
537 } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
538 syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
539 } else {
540 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
541 }
542 }
543
544 /**
545 * Synchronizes flows from the previous master, falling back to backups if the master fails.
546 *
547 * @param prevReplicaInfo the previous replica info
548 * @param newReplicaInfo the new replica info
549 */
550 private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
551 syncFlowsOn(prevReplicaInfo.master())
552 .whenCompleteAsync((result, error) -> {
553 if (error != null) {
554 log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
555 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
556 } else {
557 activateMaster(newReplicaInfo);
558 }
559 }, executorService);
560 }
561
562 /**
563 * Synchronizes flows from the previous backups.
564 *
565 * @param prevReplicaInfo the previous replica info
566 * @param newReplicaInfo the new replica info
567 */
568 private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
569 List<NodeId> backups = prevReplicaInfo.backups()
570 .stream()
571 .filter(nodeId -> !nodeId.equals(localNodeId))
572 .collect(Collectors.toList());
573 syncFlowsOn(backups)
574 .whenCompleteAsync((result, error) -> {
575 if (error != null) {
576 log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
577 }
578 activateMaster(newReplicaInfo);
579 }, executorService);
580 }
581
582 /**
583 * Synchronizes flows for the device on the given nodes.
584 *
585 * @param nodes the nodes via which to synchronize the flows
586 * @return a future to be completed once flows have been synchronizes
587 */
588 private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
589 return nodes.isEmpty()
590 ? CompletableFuture.completedFuture(null)
591 : Tools.firstOf(nodes.stream()
592 .map(node -> syncFlowsOn(node))
593 .collect(Collectors.toList()))
594 .thenApply(v -> null);
595 }
596
597 /**
598 * Synchronizes flows for the device from the given node.
599 *
600 * @param nodeId the node from which to synchronize flows
601 * @return a future to be completed once the flows have been synchronizes
602 */
603 private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
604 return requestDigests(nodeId)
605 .thenCompose(digests -> Tools.allOf(digests.stream()
606 .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
607 .map(digest -> syncBucketOn(nodeId, digest.bucket()))
608 .collect(Collectors.toList())))
609 .thenApply(v -> null);
610 }
611
612 /**
613 * Synchronizes the given bucket on the given node.
614 *
615 * @param nodeId the node on which to synchronize the bucket
616 * @param bucketNumber the bucket to synchronize
617 * @return a future to be completed once the bucket has been synchronizes
618 */
619 private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
620 return requestBucket(nodeId, bucketNumber)
621 .thenAcceptAsync(flowBucket -> {
622 flowBuckets.compute(flowBucket.bucketId().bucket(),
623 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
624 }, executorService);
625 }
626
627 /**
628 * Requests the given bucket from the given node.
629 *
630 * @param nodeId the node from which to request the bucket
631 * @param bucket the bucket to request
632 * @return a future to be completed with the bucket
633 */
634 private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
635 log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
636 return sendWithTimestamp(bucket, getBucketSubject, nodeId);
637 }
638
639 /**
640 * Handles a flow bucket request.
641 *
642 * @param bucket the bucket number
643 * @return the flow bucket
644 */
645 private FlowBucket onGetBucket(int bucket) {
646 return flowBuckets.get(bucket);
647 }
648
649 /**
650 * Activates the new master term.
651 *
652 * @param replicaInfo the new replica info
653 */
654 private void activateMaster(DeviceReplicaInfo replicaInfo) {
655 log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
656 for (int i = 0; i < NUM_BUCKETS; i++) {
657 activateBucket(i);
658 }
659 lifecycleManager.activate(replicaInfo.term());
660 activeTerm = replicaInfo.term();
661 }
662
663 /**
664 * Activates the given bucket number.
665 *
666 * @param bucket the bucket number to activate
667 */
668 private void activateBucket(int bucket) {
669 Queue<Runnable> tasks;
670 synchronized (flowTasks) {
671 tasks = flowTasks.remove(bucket);
672 }
673 if (tasks != null) {
674 log.debug("Completing enqueued operations for device {}", deviceId);
675 tasks.forEach(task -> task.run());
676 }
677 }
678
679 /**
680 * Handles a lifecycle event.
681 */
682 private void onLifecycleEvent(LifecycleEvent event) {
683 log.debug("Received lifecycle event for device {}: {}", deviceId, event);
684 switch (event.type()) {
685 case TERM_START:
686 startTerm(event.subject());
687 break;
688 case TERM_ACTIVE:
689 activateTerm(event.subject());
690 break;
691 default:
692 break;
693 }
694 }
695
696 /**
697 * Handles a replica change at the start of a new term.
698 */
699 private void startTerm(DeviceReplicaInfo replicaInfo) {
700 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
701 this.replicaInfo = replicaInfo;
702 if (replicaInfo.isMaster(localNodeId)) {
703 log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
704 syncFlows(oldReplicaInfo, replicaInfo);
705 }
706 }
707
708 /**
709 * Handles the activation of a term.
710 */
711 private void activateTerm(DeviceReplicaInfo replicaInfo) {
712 if (replicaInfo.term() < this.replicaInfo.term()) {
713 return;
714 }
715 if (replicaInfo.term() > this.replicaInfo.term()) {
716 this.replicaInfo = replicaInfo;
717 }
718
719 // If the local node is neither the master or a backup for the device, clear the flow table.
720 if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
721 flowBuckets.values().forEach(bucket -> bucket.clear());
722 }
723 activeTerm = replicaInfo.term();
724 }
725
726 /**
727 * Sends a message to the given node wrapped in a Lamport timestamp.
728 * <p>
729 * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
730 * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
731 *
732 * @param message the message to send
733 * @param subject the message subject
734 * @param toNodeId the node to which to send the message
735 * @param <M> the message type
736 * @param <R> the response type
737 * @return a future to be completed with the response
738 */
739 private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
740 return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
741 clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
742 .thenApply(response -> {
743 clock.tick(response.timestamp());
744 return response.value();
745 });
746 }
747
748 /**
749 * Receives messages to the given subject wrapped in Lamport timestamps.
750 * <p>
751 * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
752 * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
753 *
754 * @param subject the subject for which to register the subscriber
755 * @param function the raw message handler
756 * @param <M> the raw message type
757 * @param <R> the raw response type
758 */
759 private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
760 clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
761 clock.tick(request.timestamp());
762 return clock.timestamp(function.apply(request.value()));
763 }, SERIALIZER::encode, executorService);
764 }
765
766 /**
767 * Registers internal message subscribers.
768 */
769 private void registerSubscribers() {
770 receiveWithTimestamp(getDigestsSubject, v -> getDigests());
771 receiveWithTimestamp(getBucketSubject, this::onGetBucket);
772 receiveWithTimestamp(backupSubject, this::onBackup);
773 }
774
775 /**
776 * Unregisters internal message subscribers.
777 */
778 private void unregisterSubscribers() {
779 clusterCommunicator.removeSubscriber(getDigestsSubject);
780 clusterCommunicator.removeSubscriber(getBucketSubject);
781 clusterCommunicator.removeSubscriber(backupSubject);
782 }
783
784 /**
785 * Adds internal event listeners.
786 */
787 private void addListeners() {
788 lifecycleManager.addListener(lifecycleEventListener);
789 }
790
791 /**
792 * Removes internal event listeners.
793 */
794 private void removeListeners() {
795 lifecycleManager.removeListener(lifecycleEventListener);
796 }
797
798 /**
799 * Cancels recurrent scheduled futures.
800 */
801 private synchronized void cancelFutures() {
802 ScheduledFuture<?> backupFuture = this.backupFuture;
803 if (backupFuture != null) {
804 backupFuture.cancel(false);
805 }
806
807 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
808 if (antiEntropyFuture != null) {
809 antiEntropyFuture.cancel(false);
810 }
811 }
812
813 /**
814 * Closes the device flow table.
815 */
816 public void close() {
817 removeListeners();
818 unregisterSubscribers();
819 cancelFutures();
820 lifecycleManager.close();
821 }
822}