blob: 7b43f33e24a26ce17223c10d7f579966206b95d8 [file] [log] [blame]
Nikunj Desai31f277e2018-11-27 18:41:24 -05001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import java.util.Collection;
19import java.util.LinkedList;
20import java.util.List;
21import java.util.Map;
22import java.util.Queue;
23import java.util.Set;
24import java.util.concurrent.CompletableFuture;
Jordan Halterman1e1a5a22019-02-22 12:31:25 -080025import java.util.concurrent.Executor;
Nikunj Desai31f277e2018-11-27 18:41:24 -050026import java.util.concurrent.ScheduledExecutorService;
27import java.util.concurrent.ScheduledFuture;
28import java.util.concurrent.TimeUnit;
29import java.util.function.BiFunction;
30import java.util.function.Function;
31import java.util.stream.Collectors;
32
33import com.google.common.collect.Maps;
34import com.google.common.collect.Sets;
35import org.onlab.util.KryoNamespace;
36import org.onlab.util.Tools;
37import org.onosproject.cluster.ClusterService;
38import org.onosproject.cluster.NodeId;
39import org.onosproject.net.DeviceId;
40import org.onosproject.net.flow.FlowEntry;
41import org.onosproject.net.flow.FlowId;
42import org.onosproject.net.flow.FlowRule;
43import org.onosproject.net.flow.StoredFlowEntry;
44import org.onosproject.store.LogicalTimestamp;
45import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
46import org.onosproject.store.cluster.messaging.MessageSubject;
47import org.onosproject.store.serializers.KryoNamespaces;
48import org.onosproject.store.service.Serializer;
49import org.slf4j.Logger;
50import org.slf4j.LoggerFactory;
51
52/**
53 * Flow table for all flows associated with a specific device.
54 * <p>
55 * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
56 * table performs communication independent of other device flow tables for more parallelism.
57 * <p>
58 * This implementation uses several different replication protocols. Changes that occur on the device master are
59 * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
60 * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
61 * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
62 * device, allowing mastership to be reassigned to non-backup nodes.
63 */
64public class DeviceFlowTable {
Jordan Halterman1e1a5a22019-02-22 12:31:25 -080065 private static final int NUM_BUCKETS = 128;
Nikunj Desai31f277e2018-11-27 18:41:24 -050066 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
67 .register(KryoNamespaces.API)
68 .register(BucketId.class)
69 .register(FlowBucket.class)
70 .register(FlowBucketDigest.class)
71 .register(LogicalTimestamp.class)
72 .register(Timestamped.class)
73 .build());
74
75 private final Logger log = LoggerFactory.getLogger(getClass());
76
77 private final MessageSubject getDigestsSubject;
78 private final MessageSubject getBucketSubject;
79 private final MessageSubject backupSubject;
80
81 private final DeviceId deviceId;
82 private final ClusterCommunicationService clusterCommunicator;
83 private final LifecycleManager lifecycleManager;
Jordan Halterman1e1a5a22019-02-22 12:31:25 -080084 private final ScheduledExecutorService scheduler;
85 private final Executor executor;
Nikunj Desai31f277e2018-11-27 18:41:24 -050086 private final NodeId localNodeId;
87
88 private final LogicalClock clock = new LogicalClock();
89
90 private volatile DeviceReplicaInfo replicaInfo;
91 private volatile long activeTerm;
92
Jordan Halterman1e1a5a22019-02-22 12:31:25 -080093 private long backupPeriod;
94
Nikunj Desai31f277e2018-11-27 18:41:24 -050095 private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
96 @Override
97 public void event(LifecycleEvent event) {
Jordan Halterman1e1a5a22019-02-22 12:31:25 -080098 executor.execute(() -> onLifecycleEvent(event));
Nikunj Desai31f277e2018-11-27 18:41:24 -050099 }
100 };
101
Nikunj Desai31f277e2018-11-27 18:41:24 -0500102 private ScheduledFuture<?> antiEntropyFuture;
103
104 private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
105 private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
106
107 private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
108 private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
109
110 DeviceFlowTable(
111 DeviceId deviceId,
112 ClusterService clusterService,
113 ClusterCommunicationService clusterCommunicator,
114 LifecycleManager lifecycleManager,
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800115 ScheduledExecutorService scheduler,
116 Executor executor,
Nikunj Desai31f277e2018-11-27 18:41:24 -0500117 long backupPeriod,
118 long antiEntropyPeriod) {
119 this.deviceId = deviceId;
120 this.clusterCommunicator = clusterCommunicator;
121 this.lifecycleManager = lifecycleManager;
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800122 this.scheduler = scheduler;
123 this.executor = executor;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500124 this.localNodeId = clusterService.getLocalNode().id();
125
126 addListeners();
127
128 for (int i = 0; i < NUM_BUCKETS; i++) {
129 flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
130 }
131
132 getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
133 getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
134 backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
135
136 setBackupPeriod(backupPeriod);
137 setAntiEntropyPeriod(antiEntropyPeriod);
138 registerSubscribers();
139
140 startTerm(lifecycleManager.getReplicaInfo());
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800141 scheduleBackups();
Nikunj Desai31f277e2018-11-27 18:41:24 -0500142 }
143
144 /**
145 * Sets the flow table backup period.
146 *
147 * @param backupPeriod the flow table backup period in milliseconds
148 */
149 synchronized void setBackupPeriod(long backupPeriod) {
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800150 this.backupPeriod = backupPeriod;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500151 }
152
153 /**
154 * Sets the flow table anti-entropy period.
155 *
156 * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
157 */
158 synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
159 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
160 if (antiEntropyFuture != null) {
161 antiEntropyFuture.cancel(false);
162 }
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800163 this.antiEntropyFuture = scheduler.scheduleAtFixedRate(
164 () -> executor.execute(this::runAntiEntropy),
165 antiEntropyPeriod,
166 antiEntropyPeriod,
167 TimeUnit.MILLISECONDS);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500168 }
169
170 /**
171 * Counts the flows in the table.
172 *
173 * @return the total number of flows in the table
174 */
175 public int count() {
176 return flowBuckets.values().stream()
177 .mapToInt(FlowBucket::count)
178 .sum();
179 }
180
181 /**
182 * Returns the flow entry for the given rule.
183 *
184 * @param rule the rule for which to lookup the flow entry
185 * @return the flow entry for the given rule
186 */
187 public StoredFlowEntry getFlowEntry(FlowRule rule) {
188 return getBucket(rule.id())
189 .getFlowEntries(rule.id())
190 .get(rule);
191 }
192
193 /**
194 * Returns the set of flow entries in the table.
195 *
196 * @return the set of flow entries in the table
197 */
198 public Set<FlowEntry> getFlowEntries() {
199 return flowBuckets.values().stream()
200 .flatMap(bucket -> bucket.getFlowBucket().values().stream())
201 .flatMap(entries -> entries.values().stream())
202 .collect(Collectors.toSet());
203 }
204
205 /**
206 * Returns the bucket for the given flow identifier.
207 *
208 * @param flowId the flow identifier
209 * @return the bucket for the given flow identifier
210 */
211 private FlowBucket getBucket(FlowId flowId) {
212 return getBucket(bucket(flowId));
213 }
214
215 /**
216 * Returns the bucket with the given identifier.
217 *
218 * @param bucketId the bucket identifier
219 * @return the bucket with the given identifier
220 */
221 private FlowBucket getBucket(int bucketId) {
222 return flowBuckets.get(bucketId);
223 }
224
225 /**
226 * Returns the bucket number for the given flow identifier.
227 *
228 * @param flowId the flow identifier
229 * @return the bucket number for the given flow identifier
230 */
231 private int bucket(FlowId flowId) {
232 return Math.abs((int) (flowId.id() % NUM_BUCKETS));
233 }
234
235 /**
236 * Returns the digests for all buckets in the flow table for the device.
237 *
238 * @return the set of digests for all buckets for the device
239 */
240 private Set<FlowBucketDigest> getDigests() {
241 return flowBuckets.values()
242 .stream()
243 .map(bucket -> bucket.getDigest())
244 .collect(Collectors.toSet());
245 }
246
247 /**
248 * Returns the digest for the given bucket.
249 *
250 * @param bucket the bucket for which to return the digest
251 * @return the digest for the given bucket
252 */
253 private FlowBucketDigest getDigest(int bucket) {
254 return flowBuckets.get(bucket).getDigest();
255 }
256
257 /**
258 * Adds an entry to the table.
259 *
260 * @param rule the rule to add
261 * @return a future to be completed once the rule has been added
262 */
263 public CompletableFuture<Void> add(FlowEntry rule) {
264 return runInTerm(rule.id(), (bucket, term) -> {
265 bucket.add(rule, term, clock);
266 return null;
267 });
268 }
269
270 /**
271 * Updates an entry in the table.
272 *
273 * @param rule the rule to update
274 * @return a future to be completed once the rule has been updated
275 */
276 public CompletableFuture<Void> update(FlowEntry rule) {
277 return runInTerm(rule.id(), (bucket, term) -> {
278 bucket.update(rule, term, clock);
279 return null;
280 });
281 }
282
283 /**
284 * Applies the given update function to the rule.
285 *
286 * @param rule the rule to update
287 * @param function the update function to apply
288 * @param <T> the result type
289 * @return a future to be completed with the update result or {@code null} if the rule was not updated
290 */
291 public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
292 return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
293 }
294
295 /**
296 * Removes an entry from the table.
297 *
298 * @param rule the rule to remove
299 * @return a future to be completed once the rule has been removed
300 */
301 public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
302 return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
303 }
304
305 /**
306 * Runs the given function in the current term.
307 *
308 * @param flowId the flow identifier indicating the bucket in which to run the function
309 * @param function the function to execute in the current term
310 * @param <T> the future result type
311 * @return a future to be completed with the function result once it has been run
312 */
313 private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800314 CompletableFuture<T> future = new CompletableFuture<>();
315 executor.execute(() -> {
316 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
317 if (!replicaInfo.isMaster(localNodeId)) {
318 future.completeExceptionally(new IllegalStateException());
319 return;
Nikunj Desai31f277e2018-11-27 18:41:24 -0500320 }
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800321
322 FlowBucket bucket = getBucket(flowId);
323
324 // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
325 // the change to be executed once the master has been synchronized.
326 final long term = replicaInfo.term();
327 if (activeTerm < term) {
328 log.debug("Enqueueing operation for device {}", deviceId);
329 flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
330 .add(() -> future.complete(function.apply(bucket, term)));
331 } else {
332 future.complete(function.apply(bucket, term));
333 }
334 });
335 return future;
336 }
337
338 /**
339 * Schedules bucket backups.
340 */
341 private void scheduleBackups() {
342 flowBuckets.values().forEach(bucket -> backupBucket(bucket).whenComplete((result, error) -> {
343 scheduleBackup(bucket);
344 }));
345 }
346
347 /**
348 * Schedules a backup for the given bucket.
349 *
350 * @param bucket the bucket for which to schedule the backup
351 */
352 private void scheduleBackup(FlowBucket bucket) {
353 scheduler.schedule(
354 () -> executor.execute(() -> backupBucket(bucket)),
355 backupPeriod,
356 TimeUnit.MILLISECONDS);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500357 }
358
359 /**
360 * Backs up all buckets in the given device to the given node.
361 */
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800362 private CompletableFuture<Void> backupAll() {
363 CompletableFuture<?>[] futures = flowBuckets.values()
364 .stream()
365 .map(bucket -> backupBucket(bucket))
366 .toArray(CompletableFuture[]::new);
367 return CompletableFuture.allOf(futures);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500368 }
369
370 /**
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800371 * Backs up the given flow bucket.
Nikunj Desai31f277e2018-11-27 18:41:24 -0500372 *
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800373 * @param bucket the flow bucket to backup
Nikunj Desai31f277e2018-11-27 18:41:24 -0500374 */
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800375 private CompletableFuture<Void> backupBucket(FlowBucket bucket) {
376 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
Nikunj Desai31f277e2018-11-27 18:41:24 -0500377
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800378 // Only replicate if the bucket's term matches the replica term and the local node is the current master.
379 // This ensures that the bucket has been synchronized prior to a new master replicating changes to backups.
380 // Only replicate if the local node is the current master.
381 if (bucket.term() == replicaInfo.term() && replicaInfo.isMaster(localNodeId)) {
382 // Replicate the bucket to each of the backup nodes.
383 CompletableFuture<?>[] futures = replicaInfo.backups()
384 .stream()
385 .map(nodeId -> backupBucketToNode(bucket, nodeId))
386 .toArray(CompletableFuture[]::new);
387 return CompletableFuture.allOf(futures);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500388 }
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800389 return CompletableFuture.completedFuture(null);
390 }
391
392 /**
393 * Backs up the given flow bucket to the given node.
394 *
395 * @param bucket the bucket to backup
396 * @param nodeId the node to which to back up the bucket
397 * @return a future to be completed once the bucket has been backed up
398 */
399 private CompletableFuture<Void> backupBucketToNode(FlowBucket bucket, NodeId nodeId) {
400 // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
401 LogicalTimestamp timestamp = bucket.timestamp();
402
403 // If the backup can be run (no concurrent backup to the node in progress) then run it.
404 BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
405 if (startBackup(operation, timestamp)) {
406 CompletableFuture<Void> future = new CompletableFuture<>();
407 backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
408 if (error != null) {
409 log.debug("Backup operation {} failed", operation, error);
410 failBackup(operation);
411 } else if (succeeded) {
412 succeedBackup(operation, timestamp);
413 } else {
414 log.debug("Backup operation {} failed: term mismatch", operation);
415 failBackup(operation);
416 }
417 future.complete(null);
418 }, executor);
419 return future;
420 }
421 return CompletableFuture.completedFuture(null);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500422 }
423
424 /**
425 * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
426 * <p>
427 * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
428 * are pending replication for the backup operation.
429 *
430 * @param operation the operation to start
431 * @param timestamp the timestamp for which to start the backup operation
432 * @return indicates whether the given backup operation should be started
433 */
434 private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
435 LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
436 return timestamp != null
437 && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
438 && inFlightUpdates.add(operation);
439 }
440
441 /**
442 * Fails the given backup operation.
443 *
444 * @param operation the backup operation to fail
445 */
446 private void failBackup(BackupOperation operation) {
447 inFlightUpdates.remove(operation);
448 }
449
450 /**
451 * Succeeds the given backup operation.
452 * <p>
453 * The last backup time for the operation will be updated and the operation will be removed from
454 * in-flight updates.
455 *
456 * @param operation the operation to succeed
457 * @param timestamp the timestamp at which the operation was <em>started</em>
458 */
459 private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
460 lastBackupTimes.put(operation, timestamp);
461 inFlightUpdates.remove(operation);
462 }
463
464 /**
465 * Resets the last completion time for the given backup operation to ensure it's replicated again.
466 *
467 * @param operation the backup operation to reset
468 */
469 private void resetBackup(BackupOperation operation) {
470 lastBackupTimes.remove(operation);
471 }
472
473 /**
474 * Performs the given backup operation.
475 *
476 * @param bucket the bucket to backup
477 * @param nodeId the node to which to backup the bucket
478 * @return a future to be completed with a boolean indicating whether the backup operation was successful
479 */
480 private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
481 if (log.isDebugEnabled()) {
482 log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
483 }
484 return sendWithTimestamp(bucket, backupSubject, nodeId);
485 }
486
487 /**
488 * Handles a flow bucket backup from a remote peer.
489 *
490 * @param flowBucket the flow bucket to back up
491 * @return the set of flows that could not be backed up
492 */
493 private boolean onBackup(FlowBucket flowBucket) {
494 if (log.isDebugEnabled()) {
495 log.debug("{} - Received {} flow entries in bucket {} to backup",
496 deviceId, flowBucket.count(), flowBucket.bucketId());
497 }
498
499 try {
500 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
501
502 // If the backup is for a different term, reject the request until we learn about the new term.
503 if (flowBucket.term() != replicaInfo.term()) {
504 log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
505 return false;
506 }
507
508 flowBuckets.compute(flowBucket.bucketId().bucket(),
509 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
510 return true;
511 } catch (Exception e) {
512 log.warn("Failure processing backup request", e);
513 return false;
514 }
515 }
516
517 /**
518 * Runs the anti-entropy protocol.
519 */
520 private void runAntiEntropy() {
521 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
522 if (!replicaInfo.isMaster(localNodeId)) {
523 return;
524 }
525
526 for (NodeId nodeId : replicaInfo.backups()) {
527 runAntiEntropy(nodeId);
528 }
529 }
530
531 /**
532 * Runs the anti-entropy protocol against the given peer.
533 *
534 * @param nodeId the node with which to execute the anti-entropy protocol
535 */
536 private void runAntiEntropy(NodeId nodeId) {
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800537 backupAll().whenCompleteAsync((result, error) -> {
538 requestDigests(nodeId).thenAcceptAsync((digests) -> {
539 // Compute a set of missing BucketIds based on digest times and send them back to the master.
540 for (FlowBucketDigest remoteDigest : digests) {
541 FlowBucket localBucket = getBucket(remoteDigest.bucket());
542 if (localBucket.getDigest().isNewerThan(remoteDigest)) {
543 log.debug("Detected missing flow entries on node {} in bucket {}/{}",
544 nodeId, deviceId, remoteDigest.bucket());
545 resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
546 }
Nikunj Desai31f277e2018-11-27 18:41:24 -0500547 }
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800548 }, executor);
549 }, executor);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500550 }
551
552 /**
553 * Sends a digest request to the given node.
554 *
555 * @param nodeId the node to which to send the request
556 * @return future to be completed with the set of digests for the given device on the given node
557 */
558 private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
559 return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
560 }
561
562 /**
563 * Synchronizes flows from the previous master or backups.
564 *
565 * @param prevReplicaInfo the previous replica info
566 * @param newReplicaInfo the new replica info
567 */
568 private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
569 if (prevReplicaInfo == null) {
570 activateMaster(newReplicaInfo);
571 } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
572 syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
573 } else {
574 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
575 }
576 }
577
578 /**
579 * Synchronizes flows from the previous master, falling back to backups if the master fails.
580 *
581 * @param prevReplicaInfo the previous replica info
582 * @param newReplicaInfo the new replica info
583 */
584 private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
585 syncFlowsOn(prevReplicaInfo.master())
586 .whenCompleteAsync((result, error) -> {
587 if (error != null) {
588 log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
589 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
590 } else {
591 activateMaster(newReplicaInfo);
592 }
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800593 }, executor);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500594 }
595
596 /**
597 * Synchronizes flows from the previous backups.
598 *
599 * @param prevReplicaInfo the previous replica info
600 * @param newReplicaInfo the new replica info
601 */
602 private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
603 List<NodeId> backups = prevReplicaInfo.backups()
604 .stream()
605 .filter(nodeId -> !nodeId.equals(localNodeId))
606 .collect(Collectors.toList());
607 syncFlowsOn(backups)
608 .whenCompleteAsync((result, error) -> {
609 if (error != null) {
610 log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
611 }
612 activateMaster(newReplicaInfo);
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800613 }, executor);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500614 }
615
616 /**
617 * Synchronizes flows for the device on the given nodes.
618 *
619 * @param nodes the nodes via which to synchronize the flows
620 * @return a future to be completed once flows have been synchronizes
621 */
622 private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
623 return nodes.isEmpty()
624 ? CompletableFuture.completedFuture(null)
625 : Tools.firstOf(nodes.stream()
626 .map(node -> syncFlowsOn(node))
627 .collect(Collectors.toList()))
628 .thenApply(v -> null);
629 }
630
631 /**
632 * Synchronizes flows for the device from the given node.
633 *
634 * @param nodeId the node from which to synchronize flows
635 * @return a future to be completed once the flows have been synchronizes
636 */
637 private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
638 return requestDigests(nodeId)
639 .thenCompose(digests -> Tools.allOf(digests.stream()
640 .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
641 .map(digest -> syncBucketOn(nodeId, digest.bucket()))
642 .collect(Collectors.toList())))
643 .thenApply(v -> null);
644 }
645
646 /**
647 * Synchronizes the given bucket on the given node.
648 *
649 * @param nodeId the node on which to synchronize the bucket
650 * @param bucketNumber the bucket to synchronize
651 * @return a future to be completed once the bucket has been synchronizes
652 */
653 private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
654 return requestBucket(nodeId, bucketNumber)
655 .thenAcceptAsync(flowBucket -> {
656 flowBuckets.compute(flowBucket.bucketId().bucket(),
657 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800658 }, executor);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500659 }
660
661 /**
662 * Requests the given bucket from the given node.
663 *
664 * @param nodeId the node from which to request the bucket
665 * @param bucket the bucket to request
666 * @return a future to be completed with the bucket
667 */
668 private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
669 log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
670 return sendWithTimestamp(bucket, getBucketSubject, nodeId);
671 }
672
673 /**
674 * Handles a flow bucket request.
675 *
676 * @param bucket the bucket number
677 * @return the flow bucket
678 */
679 private FlowBucket onGetBucket(int bucket) {
680 return flowBuckets.get(bucket);
681 }
682
683 /**
684 * Activates the new master term.
685 *
686 * @param replicaInfo the new replica info
687 */
688 private void activateMaster(DeviceReplicaInfo replicaInfo) {
689 log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
690 for (int i = 0; i < NUM_BUCKETS; i++) {
691 activateBucket(i);
692 }
693 lifecycleManager.activate(replicaInfo.term());
694 activeTerm = replicaInfo.term();
695 }
696
697 /**
698 * Activates the given bucket number.
699 *
700 * @param bucket the bucket number to activate
701 */
702 private void activateBucket(int bucket) {
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800703 Queue<Runnable> tasks = flowTasks.remove(bucket);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500704 if (tasks != null) {
705 log.debug("Completing enqueued operations for device {}", deviceId);
706 tasks.forEach(task -> task.run());
707 }
708 }
709
710 /**
711 * Handles a lifecycle event.
712 */
713 private void onLifecycleEvent(LifecycleEvent event) {
714 log.debug("Received lifecycle event for device {}: {}", deviceId, event);
715 switch (event.type()) {
716 case TERM_START:
717 startTerm(event.subject());
718 break;
719 case TERM_ACTIVE:
720 activateTerm(event.subject());
721 break;
722 case TERM_UPDATE:
723 updateTerm(event.subject());
724 break;
725 default:
726 break;
727 }
728 }
729
730 /**
731 * Handles a replica change at the start of a new term.
732 */
733 private void startTerm(DeviceReplicaInfo replicaInfo) {
734 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
735 this.replicaInfo = replicaInfo;
736 if (replicaInfo.isMaster(localNodeId)) {
737 log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
738 syncFlows(oldReplicaInfo, replicaInfo);
739 }
740 }
741
742 /**
743 * Handles the activation of a term.
744 */
745 private void activateTerm(DeviceReplicaInfo replicaInfo) {
746 if (replicaInfo.term() < this.replicaInfo.term()) {
747 return;
748 }
749 if (replicaInfo.term() > this.replicaInfo.term()) {
750 this.replicaInfo = replicaInfo;
751 }
752
753 // If the local node is neither the master or a backup for the device, clear the flow table.
754 if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
755 flowBuckets.values().forEach(bucket -> bucket.clear());
756 }
757 activeTerm = replicaInfo.term();
758 }
759
760 /**
761 * Handles an update to a term.
762 */
763 private void updateTerm(DeviceReplicaInfo replicaInfo) {
764 if (replicaInfo.term() == this.replicaInfo.term()) {
765 this.replicaInfo = replicaInfo;
766
767 // If the local node is neither the master or a backup for the device *and the term is active*,
768 // clear the flow table.
769 if (activeTerm == replicaInfo.term()
770 && !replicaInfo.isMaster(localNodeId)
771 && !replicaInfo.isBackup(localNodeId)) {
772 flowBuckets.values().forEach(bucket -> bucket.clear());
773 }
774 }
775 }
776
777 /**
778 * Sends a message to the given node wrapped in a Lamport timestamp.
779 * <p>
780 * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
781 * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
782 *
783 * @param message the message to send
784 * @param subject the message subject
785 * @param toNodeId the node to which to send the message
786 * @param <M> the message type
787 * @param <R> the response type
788 * @return a future to be completed with the response
789 */
790 private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
791 return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
792 clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
793 .thenApply(response -> {
794 clock.tick(response.timestamp());
795 return response.value();
796 });
797 }
798
799 /**
800 * Receives messages to the given subject wrapped in Lamport timestamps.
801 * <p>
802 * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
803 * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
804 *
805 * @param subject the subject for which to register the subscriber
806 * @param function the raw message handler
807 * @param <M> the raw message type
808 * @param <R> the raw response type
809 */
810 private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
811 clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
812 clock.tick(request.timestamp());
813 return clock.timestamp(function.apply(request.value()));
Jordan Halterman1e1a5a22019-02-22 12:31:25 -0800814 }, SERIALIZER::encode, executor);
Nikunj Desai31f277e2018-11-27 18:41:24 -0500815 }
816
817 /**
818 * Registers internal message subscribers.
819 */
820 private void registerSubscribers() {
821 receiveWithTimestamp(getDigestsSubject, v -> getDigests());
822 receiveWithTimestamp(getBucketSubject, this::onGetBucket);
823 receiveWithTimestamp(backupSubject, this::onBackup);
824 }
825
826 /**
827 * Unregisters internal message subscribers.
828 */
829 private void unregisterSubscribers() {
830 clusterCommunicator.removeSubscriber(getDigestsSubject);
831 clusterCommunicator.removeSubscriber(getBucketSubject);
832 clusterCommunicator.removeSubscriber(backupSubject);
833 }
834
835 /**
836 * Adds internal event listeners.
837 */
838 private void addListeners() {
839 lifecycleManager.addListener(lifecycleEventListener);
840 }
841
842 /**
843 * Removes internal event listeners.
844 */
845 private void removeListeners() {
846 lifecycleManager.removeListener(lifecycleEventListener);
847 }
848
849 /**
850 * Cancels recurrent scheduled futures.
851 */
852 private synchronized void cancelFutures() {
Nikunj Desai31f277e2018-11-27 18:41:24 -0500853 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
854 if (antiEntropyFuture != null) {
855 antiEntropyFuture.cancel(false);
856 }
857 }
858
859 /**
860 * Purges the flow table.
861 */
862 public void purge() {
863 flowTasks.clear();
864 flowBuckets.values().forEach(bucket -> bucket.purge());
865 lastBackupTimes.clear();
866 inFlightUpdates.clear();
867 }
868
869 /**
870 * Closes the device flow table.
871 */
872 public void close() {
873 removeListeners();
874 unregisterSubscribers();
875 cancelFutures();
876 lifecycleManager.close();
877 }
878}