blob: 1a5085df801123cbd94405ff99be0aa22f962805 [file] [log] [blame]
Jordan Halterman356cda52018-06-15 17:46:28 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import java.util.Collection;
Jordan Halterman9b49e342019-04-17 11:05:16 -070019import java.util.Collections;
Jordan Halterman356cda52018-06-15 17:46:28 -070020import java.util.LinkedList;
21import java.util.List;
22import java.util.Map;
23import java.util.Queue;
24import java.util.Set;
25import java.util.concurrent.CompletableFuture;
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -080026import java.util.concurrent.Executor;
Jordan Halterman356cda52018-06-15 17:46:28 -070027import java.util.concurrent.ScheduledExecutorService;
28import java.util.concurrent.ScheduledFuture;
29import java.util.concurrent.TimeUnit;
30import java.util.function.BiFunction;
31import java.util.function.Function;
32import java.util.stream.Collectors;
33
Jordan Halterman9b49e342019-04-17 11:05:16 -070034import com.google.common.collect.Iterables;
Jordan Halterman356cda52018-06-15 17:46:28 -070035import com.google.common.collect.Maps;
36import com.google.common.collect.Sets;
37import org.onlab.util.KryoNamespace;
38import org.onlab.util.Tools;
39import org.onosproject.cluster.ClusterService;
40import org.onosproject.cluster.NodeId;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.flow.FlowEntry;
43import org.onosproject.net.flow.FlowId;
44import org.onosproject.net.flow.FlowRule;
45import org.onosproject.net.flow.StoredFlowEntry;
46import org.onosproject.store.LogicalTimestamp;
47import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
48import org.onosproject.store.cluster.messaging.MessageSubject;
49import org.onosproject.store.serializers.KryoNamespaces;
50import org.onosproject.store.service.Serializer;
51import org.slf4j.Logger;
52import org.slf4j.LoggerFactory;
53
54/**
55 * Flow table for all flows associated with a specific device.
56 * <p>
57 * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
58 * table performs communication independent of other device flow tables for more parallelism.
59 * <p>
60 * This implementation uses several different replication protocols. Changes that occur on the device master are
61 * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
62 * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
63 * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
64 * device, allowing mastership to be reassigned to non-backup nodes.
65 */
66public class DeviceFlowTable {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -080067 private static final int NUM_BUCKETS = 128;
Jordan Halterman356cda52018-06-15 17:46:28 -070068 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
69 .register(KryoNamespaces.API)
70 .register(BucketId.class)
71 .register(FlowBucket.class)
72 .register(FlowBucketDigest.class)
73 .register(LogicalTimestamp.class)
74 .register(Timestamped.class)
75 .build());
76
77 private final Logger log = LoggerFactory.getLogger(getClass());
78
79 private final MessageSubject getDigestsSubject;
80 private final MessageSubject getBucketSubject;
81 private final MessageSubject backupSubject;
Jordan Halterman9b49e342019-04-17 11:05:16 -070082 private final MessageSubject getFlowsSubject;
Jordan Halterman356cda52018-06-15 17:46:28 -070083
84 private final DeviceId deviceId;
85 private final ClusterCommunicationService clusterCommunicator;
86 private final LifecycleManager lifecycleManager;
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -080087 private final ScheduledExecutorService scheduler;
88 private final Executor executor;
Jordan Halterman356cda52018-06-15 17:46:28 -070089 private final NodeId localNodeId;
90
91 private final LogicalClock clock = new LogicalClock();
92
93 private volatile DeviceReplicaInfo replicaInfo;
94 private volatile long activeTerm;
95
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -080096 private long backupPeriod;
97
Jordan Halterman356cda52018-06-15 17:46:28 -070098 private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
99 @Override
100 public void event(LifecycleEvent event) {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800101 executor.execute(() -> onLifecycleEvent(event));
Jordan Halterman356cda52018-06-15 17:46:28 -0700102 }
103 };
104
Jordan Halterman356cda52018-06-15 17:46:28 -0700105 private ScheduledFuture<?> antiEntropyFuture;
106
107 private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
108 private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
109
110 private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
111 private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
112
113 DeviceFlowTable(
114 DeviceId deviceId,
115 ClusterService clusterService,
116 ClusterCommunicationService clusterCommunicator,
117 LifecycleManager lifecycleManager,
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800118 ScheduledExecutorService scheduler,
119 Executor executor,
Jordan Halterman356cda52018-06-15 17:46:28 -0700120 long backupPeriod,
121 long antiEntropyPeriod) {
122 this.deviceId = deviceId;
123 this.clusterCommunicator = clusterCommunicator;
124 this.lifecycleManager = lifecycleManager;
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800125 this.scheduler = scheduler;
126 this.executor = executor;
Jordan Halterman356cda52018-06-15 17:46:28 -0700127 this.localNodeId = clusterService.getLocalNode().id();
Jordan Halterman62022ef2019-03-08 10:48:22 -0800128 this.replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman356cda52018-06-15 17:46:28 -0700129
130 for (int i = 0; i < NUM_BUCKETS; i++) {
131 flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
132 }
133
134 getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
135 getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
136 backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
Jordan Halterman9b49e342019-04-17 11:05:16 -0700137 getFlowsSubject = new MessageSubject(String.format("flow-store-%s-flows", deviceId));
Jordan Halterman356cda52018-06-15 17:46:28 -0700138
Jordan Halterman62022ef2019-03-08 10:48:22 -0800139 addListeners();
140
Jordan Halterman356cda52018-06-15 17:46:28 -0700141 setBackupPeriod(backupPeriod);
142 setAntiEntropyPeriod(antiEntropyPeriod);
143 registerSubscribers();
144
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800145 scheduleBackups();
Jordan Halterman62022ef2019-03-08 10:48:22 -0800146
147 activateMaster(replicaInfo);
Jordan Halterman356cda52018-06-15 17:46:28 -0700148 }
149
150 /**
151 * Sets the flow table backup period.
152 *
153 * @param backupPeriod the flow table backup period in milliseconds
154 */
155 synchronized void setBackupPeriod(long backupPeriod) {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800156 this.backupPeriod = backupPeriod;
Jordan Halterman356cda52018-06-15 17:46:28 -0700157 }
158
159 /**
160 * Sets the flow table anti-entropy period.
161 *
162 * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
163 */
164 synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
165 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
166 if (antiEntropyFuture != null) {
167 antiEntropyFuture.cancel(false);
168 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800169 this.antiEntropyFuture = scheduler.scheduleAtFixedRate(
170 () -> executor.execute(this::runAntiEntropy),
171 antiEntropyPeriod,
172 antiEntropyPeriod,
173 TimeUnit.MILLISECONDS);
Jordan Halterman356cda52018-06-15 17:46:28 -0700174 }
175
176 /**
177 * Counts the flows in the table.
178 *
179 * @return the total number of flows in the table
180 */
181 public int count() {
182 return flowBuckets.values().stream()
183 .mapToInt(FlowBucket::count)
184 .sum();
185 }
186
187 /**
188 * Returns the flow entry for the given rule.
189 *
190 * @param rule the rule for which to lookup the flow entry
191 * @return the flow entry for the given rule
192 */
193 public StoredFlowEntry getFlowEntry(FlowRule rule) {
194 return getBucket(rule.id())
195 .getFlowEntries(rule.id())
196 .get(rule);
197 }
198
199 /**
200 * Returns the set of flow entries in the table.
201 *
202 * @return the set of flow entries in the table
203 */
Jordan Halterman9b49e342019-04-17 11:05:16 -0700204 public CompletableFuture<Iterable<FlowEntry>> getFlowEntries() {
205 // Fetch the entries for each bucket in parallel and then concatenate the sets
206 // to create a single iterable.
207 return Tools.allOf(flowBuckets.values()
208 .stream()
209 .map(this::getFlowEntries)
210 .collect(Collectors.toList()))
211 .thenApply(Iterables::concat);
212 }
213
214 /**
215 * Fetches the set of flow entries in the given bucket.
216 *
217 * @param bucketId the bucket for which to fetch flow entries
218 * @return a future to be completed once the flow entries have been retrieved
219 */
220 private CompletableFuture<Set<FlowEntry>> getFlowEntries(BucketId bucketId) {
221 return getFlowEntries(getBucket(bucketId.bucket()));
222 }
223
224 /**
225 * Fetches the set of flow entries in the given bucket.
226 *
227 * @param bucket the bucket for which to fetch flow entries
228 * @return a future to be completed once the flow entries have been retrieved
229 */
230 private CompletableFuture<Set<FlowEntry>> getFlowEntries(FlowBucket bucket) {
231 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
232
233 // If the local node is the master, fetch the entries locally. Otherwise, request the entries
234 // from the current master. Note that there's a change of a brief cycle during a mastership change.
235 if (replicaInfo.isMaster(localNodeId)) {
236 return CompletableFuture.completedFuture(
237 bucket.getFlowBucket().values().stream()
238 .flatMap(entries -> entries.values().stream())
239 .collect(Collectors.toSet()));
240 } else if (replicaInfo.master() != null) {
241 return clusterCommunicator.sendAndReceive(
242 bucket.bucketId(),
243 getFlowsSubject,
244 SERIALIZER::encode,
245 SERIALIZER::decode,
246 replicaInfo.master());
247 } else {
248 return CompletableFuture.completedFuture(Collections.emptySet());
249 }
Jordan Halterman356cda52018-06-15 17:46:28 -0700250 }
251
252 /**
253 * Returns the bucket for the given flow identifier.
254 *
255 * @param flowId the flow identifier
256 * @return the bucket for the given flow identifier
257 */
258 private FlowBucket getBucket(FlowId flowId) {
259 return getBucket(bucket(flowId));
260 }
261
262 /**
263 * Returns the bucket with the given identifier.
264 *
265 * @param bucketId the bucket identifier
266 * @return the bucket with the given identifier
267 */
268 private FlowBucket getBucket(int bucketId) {
269 return flowBuckets.get(bucketId);
270 }
271
272 /**
273 * Returns the bucket number for the given flow identifier.
274 *
275 * @param flowId the flow identifier
276 * @return the bucket number for the given flow identifier
277 */
278 private int bucket(FlowId flowId) {
279 return Math.abs((int) (flowId.id() % NUM_BUCKETS));
280 }
281
282 /**
283 * Returns the digests for all buckets in the flow table for the device.
284 *
285 * @return the set of digests for all buckets for the device
286 */
287 private Set<FlowBucketDigest> getDigests() {
288 return flowBuckets.values()
289 .stream()
290 .map(bucket -> bucket.getDigest())
291 .collect(Collectors.toSet());
292 }
293
294 /**
295 * Returns the digest for the given bucket.
296 *
297 * @param bucket the bucket for which to return the digest
298 * @return the digest for the given bucket
299 */
300 private FlowBucketDigest getDigest(int bucket) {
301 return flowBuckets.get(bucket).getDigest();
302 }
303
304 /**
305 * Adds an entry to the table.
306 *
307 * @param rule the rule to add
308 * @return a future to be completed once the rule has been added
309 */
310 public CompletableFuture<Void> add(FlowEntry rule) {
311 return runInTerm(rule.id(), (bucket, term) -> {
312 bucket.add(rule, term, clock);
313 return null;
314 });
315 }
316
317 /**
318 * Updates an entry in the table.
319 *
320 * @param rule the rule to update
321 * @return a future to be completed once the rule has been updated
322 */
323 public CompletableFuture<Void> update(FlowEntry rule) {
324 return runInTerm(rule.id(), (bucket, term) -> {
325 bucket.update(rule, term, clock);
326 return null;
327 });
328 }
329
330 /**
331 * Applies the given update function to the rule.
332 *
333 * @param rule the rule to update
334 * @param function the update function to apply
335 * @param <T> the result type
336 * @return a future to be completed with the update result or {@code null} if the rule was not updated
337 */
338 public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
339 return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
340 }
341
342 /**
343 * Removes an entry from the table.
344 *
345 * @param rule the rule to remove
346 * @return a future to be completed once the rule has been removed
347 */
348 public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
349 return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
350 }
351
352 /**
353 * Runs the given function in the current term.
354 *
355 * @param flowId the flow identifier indicating the bucket in which to run the function
356 * @param function the function to execute in the current term
357 * @param <T> the future result type
358 * @return a future to be completed with the function result once it has been run
359 */
360 private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800361 CompletableFuture<T> future = new CompletableFuture<>();
362 executor.execute(() -> {
363 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
364 if (!replicaInfo.isMaster(localNodeId)) {
365 future.completeExceptionally(new IllegalStateException());
366 return;
Jordan Halterman356cda52018-06-15 17:46:28 -0700367 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800368
369 FlowBucket bucket = getBucket(flowId);
370
371 // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
372 // the change to be executed once the master has been synchronized.
373 final long term = replicaInfo.term();
374 if (activeTerm < term) {
375 log.debug("Enqueueing operation for device {}", deviceId);
376 flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
377 .add(() -> future.complete(function.apply(bucket, term)));
378 } else {
379 future.complete(function.apply(bucket, term));
380 }
381 });
382 return future;
383 }
384
385 /**
386 * Schedules bucket backups.
387 */
388 private void scheduleBackups() {
389 flowBuckets.values().forEach(bucket -> backupBucket(bucket).whenComplete((result, error) -> {
390 scheduleBackup(bucket);
391 }));
392 }
393
394 /**
395 * Schedules a backup for the given bucket.
396 *
397 * @param bucket the bucket for which to schedule the backup
398 */
399 private void scheduleBackup(FlowBucket bucket) {
400 scheduler.schedule(
401 () -> executor.execute(() -> backupBucket(bucket)),
402 backupPeriod,
403 TimeUnit.MILLISECONDS);
Jordan Halterman356cda52018-06-15 17:46:28 -0700404 }
405
406 /**
407 * Backs up all buckets in the given device to the given node.
408 */
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800409 private CompletableFuture<Void> backupAll() {
410 CompletableFuture<?>[] futures = flowBuckets.values()
411 .stream()
412 .map(bucket -> backupBucket(bucket))
413 .toArray(CompletableFuture[]::new);
414 return CompletableFuture.allOf(futures);
Jordan Halterman356cda52018-06-15 17:46:28 -0700415 }
416
417 /**
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800418 * Backs up the given flow bucket.
Jordan Halterman356cda52018-06-15 17:46:28 -0700419 *
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800420 * @param bucket the flow bucket to backup
Jordan Halterman356cda52018-06-15 17:46:28 -0700421 */
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800422 private CompletableFuture<Void> backupBucket(FlowBucket bucket) {
423 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman356cda52018-06-15 17:46:28 -0700424
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800425 // Only replicate if the bucket's term matches the replica term and the local node is the current master.
426 // This ensures that the bucket has been synchronized prior to a new master replicating changes to backups.
427 // Only replicate if the local node is the current master.
428 if (bucket.term() == replicaInfo.term() && replicaInfo.isMaster(localNodeId)) {
429 // Replicate the bucket to each of the backup nodes.
430 CompletableFuture<?>[] futures = replicaInfo.backups()
431 .stream()
432 .map(nodeId -> backupBucketToNode(bucket, nodeId))
433 .toArray(CompletableFuture[]::new);
434 return CompletableFuture.allOf(futures);
Jordan Halterman356cda52018-06-15 17:46:28 -0700435 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800436 return CompletableFuture.completedFuture(null);
437 }
438
439 /**
440 * Backs up the given flow bucket to the given node.
441 *
442 * @param bucket the bucket to backup
443 * @param nodeId the node to which to back up the bucket
444 * @return a future to be completed once the bucket has been backed up
445 */
446 private CompletableFuture<Void> backupBucketToNode(FlowBucket bucket, NodeId nodeId) {
447 // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
448 LogicalTimestamp timestamp = bucket.timestamp();
449
450 // If the backup can be run (no concurrent backup to the node in progress) then run it.
451 BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
452 if (startBackup(operation, timestamp)) {
453 CompletableFuture<Void> future = new CompletableFuture<>();
454 backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
455 if (error != null) {
456 log.debug("Backup operation {} failed", operation, error);
457 failBackup(operation);
458 } else if (succeeded) {
459 succeedBackup(operation, timestamp);
460 } else {
461 log.debug("Backup operation {} failed: term mismatch", operation);
462 failBackup(operation);
463 }
464 future.complete(null);
465 }, executor);
466 return future;
467 }
468 return CompletableFuture.completedFuture(null);
Jordan Halterman356cda52018-06-15 17:46:28 -0700469 }
470
471 /**
472 * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
473 * <p>
474 * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
475 * are pending replication for the backup operation.
476 *
477 * @param operation the operation to start
478 * @param timestamp the timestamp for which to start the backup operation
479 * @return indicates whether the given backup operation should be started
480 */
481 private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
482 LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
483 return timestamp != null
484 && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
485 && inFlightUpdates.add(operation);
486 }
487
488 /**
489 * Fails the given backup operation.
490 *
491 * @param operation the backup operation to fail
492 */
493 private void failBackup(BackupOperation operation) {
494 inFlightUpdates.remove(operation);
495 }
496
497 /**
498 * Succeeds the given backup operation.
499 * <p>
500 * The last backup time for the operation will be updated and the operation will be removed from
501 * in-flight updates.
502 *
503 * @param operation the operation to succeed
504 * @param timestamp the timestamp at which the operation was <em>started</em>
505 */
506 private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
507 lastBackupTimes.put(operation, timestamp);
508 inFlightUpdates.remove(operation);
509 }
510
511 /**
512 * Resets the last completion time for the given backup operation to ensure it's replicated again.
513 *
514 * @param operation the backup operation to reset
515 */
516 private void resetBackup(BackupOperation operation) {
517 lastBackupTimes.remove(operation);
518 }
519
520 /**
521 * Performs the given backup operation.
522 *
523 * @param bucket the bucket to backup
524 * @param nodeId the node to which to backup the bucket
525 * @return a future to be completed with a boolean indicating whether the backup operation was successful
526 */
527 private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
528 if (log.isDebugEnabled()) {
529 log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
530 }
531 return sendWithTimestamp(bucket, backupSubject, nodeId);
532 }
533
534 /**
535 * Handles a flow bucket backup from a remote peer.
536 *
537 * @param flowBucket the flow bucket to back up
538 * @return the set of flows that could not be backed up
539 */
540 private boolean onBackup(FlowBucket flowBucket) {
541 if (log.isDebugEnabled()) {
542 log.debug("{} - Received {} flow entries in bucket {} to backup",
543 deviceId, flowBucket.count(), flowBucket.bucketId());
544 }
545
546 try {
547 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
548
549 // If the backup is for a different term, reject the request until we learn about the new term.
550 if (flowBucket.term() != replicaInfo.term()) {
551 log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
552 return false;
553 }
554
555 flowBuckets.compute(flowBucket.bucketId().bucket(),
556 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
557 return true;
558 } catch (Exception e) {
559 log.warn("Failure processing backup request", e);
560 return false;
561 }
562 }
563
564 /**
565 * Runs the anti-entropy protocol.
566 */
567 private void runAntiEntropy() {
568 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
569 if (!replicaInfo.isMaster(localNodeId)) {
570 return;
571 }
572
573 for (NodeId nodeId : replicaInfo.backups()) {
574 runAntiEntropy(nodeId);
575 }
576 }
577
578 /**
579 * Runs the anti-entropy protocol against the given peer.
580 *
581 * @param nodeId the node with which to execute the anti-entropy protocol
582 */
583 private void runAntiEntropy(NodeId nodeId) {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800584 backupAll().whenCompleteAsync((result, error) -> {
585 requestDigests(nodeId).thenAcceptAsync((digests) -> {
586 // Compute a set of missing BucketIds based on digest times and send them back to the master.
587 for (FlowBucketDigest remoteDigest : digests) {
588 FlowBucket localBucket = getBucket(remoteDigest.bucket());
589 if (localBucket.getDigest().isNewerThan(remoteDigest)) {
590 log.debug("Detected missing flow entries on node {} in bucket {}/{}",
591 nodeId, deviceId, remoteDigest.bucket());
592 resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
593 }
Jordan Halterman356cda52018-06-15 17:46:28 -0700594 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800595 }, executor);
596 }, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700597 }
598
599 /**
600 * Sends a digest request to the given node.
601 *
602 * @param nodeId the node to which to send the request
603 * @return future to be completed with the set of digests for the given device on the given node
604 */
605 private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
606 return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
607 }
608
609 /**
610 * Synchronizes flows from the previous master or backups.
611 *
612 * @param prevReplicaInfo the previous replica info
613 * @param newReplicaInfo the new replica info
614 */
615 private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
616 if (prevReplicaInfo == null) {
617 activateMaster(newReplicaInfo);
618 } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
619 syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
620 } else {
621 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
622 }
623 }
624
625 /**
626 * Synchronizes flows from the previous master, falling back to backups if the master fails.
627 *
628 * @param prevReplicaInfo the previous replica info
629 * @param newReplicaInfo the new replica info
630 */
631 private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
632 syncFlowsOn(prevReplicaInfo.master())
633 .whenCompleteAsync((result, error) -> {
634 if (error != null) {
635 log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
636 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
637 } else {
638 activateMaster(newReplicaInfo);
639 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800640 }, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700641 }
642
643 /**
644 * Synchronizes flows from the previous backups.
645 *
646 * @param prevReplicaInfo the previous replica info
647 * @param newReplicaInfo the new replica info
648 */
649 private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
650 List<NodeId> backups = prevReplicaInfo.backups()
651 .stream()
652 .filter(nodeId -> !nodeId.equals(localNodeId))
653 .collect(Collectors.toList());
654 syncFlowsOn(backups)
655 .whenCompleteAsync((result, error) -> {
656 if (error != null) {
657 log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
658 }
659 activateMaster(newReplicaInfo);
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800660 }, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700661 }
662
663 /**
664 * Synchronizes flows for the device on the given nodes.
665 *
666 * @param nodes the nodes via which to synchronize the flows
667 * @return a future to be completed once flows have been synchronizes
668 */
669 private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
670 return nodes.isEmpty()
671 ? CompletableFuture.completedFuture(null)
672 : Tools.firstOf(nodes.stream()
673 .map(node -> syncFlowsOn(node))
674 .collect(Collectors.toList()))
675 .thenApply(v -> null);
676 }
677
678 /**
679 * Synchronizes flows for the device from the given node.
680 *
681 * @param nodeId the node from which to synchronize flows
682 * @return a future to be completed once the flows have been synchronizes
683 */
684 private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
685 return requestDigests(nodeId)
686 .thenCompose(digests -> Tools.allOf(digests.stream()
687 .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
688 .map(digest -> syncBucketOn(nodeId, digest.bucket()))
689 .collect(Collectors.toList())))
690 .thenApply(v -> null);
691 }
692
693 /**
694 * Synchronizes the given bucket on the given node.
695 *
696 * @param nodeId the node on which to synchronize the bucket
697 * @param bucketNumber the bucket to synchronize
698 * @return a future to be completed once the bucket has been synchronizes
699 */
700 private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
701 return requestBucket(nodeId, bucketNumber)
702 .thenAcceptAsync(flowBucket -> {
703 flowBuckets.compute(flowBucket.bucketId().bucket(),
704 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800705 }, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700706 }
707
708 /**
709 * Requests the given bucket from the given node.
710 *
711 * @param nodeId the node from which to request the bucket
712 * @param bucket the bucket to request
713 * @return a future to be completed with the bucket
714 */
715 private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
716 log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
717 return sendWithTimestamp(bucket, getBucketSubject, nodeId);
718 }
719
720 /**
721 * Handles a flow bucket request.
722 *
Jordan Halterman4c3a0452018-06-30 23:44:30 -0700723 * @param bucketId the bucket number
Jordan Halterman356cda52018-06-15 17:46:28 -0700724 * @return the flow bucket
725 */
Jordan Halterman4c3a0452018-06-30 23:44:30 -0700726 private FlowBucket onGetBucket(int bucketId) {
727 return flowBuckets.get(bucketId).copy();
Jordan Halterman356cda52018-06-15 17:46:28 -0700728 }
729
730 /**
731 * Activates the new master term.
732 *
733 * @param replicaInfo the new replica info
734 */
735 private void activateMaster(DeviceReplicaInfo replicaInfo) {
Jordan Halterman62022ef2019-03-08 10:48:22 -0800736 if (replicaInfo.isMaster(localNodeId)) {
737 log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
738 for (int i = 0; i < NUM_BUCKETS; i++) {
739 activateBucket(i);
740 }
741 lifecycleManager.activate(replicaInfo.term());
742 activeTerm = replicaInfo.term();
Jordan Halterman356cda52018-06-15 17:46:28 -0700743 }
Jordan Halterman356cda52018-06-15 17:46:28 -0700744 }
745
746 /**
747 * Activates the given bucket number.
748 *
749 * @param bucket the bucket number to activate
750 */
751 private void activateBucket(int bucket) {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800752 Queue<Runnable> tasks = flowTasks.remove(bucket);
Jordan Halterman356cda52018-06-15 17:46:28 -0700753 if (tasks != null) {
754 log.debug("Completing enqueued operations for device {}", deviceId);
755 tasks.forEach(task -> task.run());
756 }
757 }
758
759 /**
760 * Handles a lifecycle event.
761 */
762 private void onLifecycleEvent(LifecycleEvent event) {
763 log.debug("Received lifecycle event for device {}: {}", deviceId, event);
764 switch (event.type()) {
765 case TERM_START:
766 startTerm(event.subject());
767 break;
768 case TERM_ACTIVE:
769 activateTerm(event.subject());
770 break;
Jordan Halterman97cd2722018-06-22 16:58:08 -0700771 case TERM_UPDATE:
772 updateTerm(event.subject());
773 break;
Jordan Halterman356cda52018-06-15 17:46:28 -0700774 default:
775 break;
776 }
777 }
778
779 /**
780 * Handles a replica change at the start of a new term.
781 */
782 private void startTerm(DeviceReplicaInfo replicaInfo) {
783 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
784 this.replicaInfo = replicaInfo;
785 if (replicaInfo.isMaster(localNodeId)) {
786 log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
787 syncFlows(oldReplicaInfo, replicaInfo);
788 }
789 }
790
791 /**
792 * Handles the activation of a term.
793 */
794 private void activateTerm(DeviceReplicaInfo replicaInfo) {
795 if (replicaInfo.term() < this.replicaInfo.term()) {
796 return;
797 }
798 if (replicaInfo.term() > this.replicaInfo.term()) {
799 this.replicaInfo = replicaInfo;
800 }
801
802 // If the local node is neither the master or a backup for the device, clear the flow table.
803 if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
804 flowBuckets.values().forEach(bucket -> bucket.clear());
805 }
806 activeTerm = replicaInfo.term();
807 }
808
809 /**
Jordan Halterman97cd2722018-06-22 16:58:08 -0700810 * Handles an update to a term.
811 */
812 private void updateTerm(DeviceReplicaInfo replicaInfo) {
Jordan Haltermanf1240a92019-03-07 15:52:34 -0800813 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
814 if (oldReplicaInfo != null && replicaInfo.term() == oldReplicaInfo.term()) {
Jordan Halterman97cd2722018-06-22 16:58:08 -0700815 this.replicaInfo = replicaInfo;
816
817 // If the local node is neither the master or a backup for the device *and the term is active*,
818 // clear the flow table.
819 if (activeTerm == replicaInfo.term()
820 && !replicaInfo.isMaster(localNodeId)
821 && !replicaInfo.isBackup(localNodeId)) {
822 flowBuckets.values().forEach(bucket -> bucket.clear());
823 }
824 }
825 }
826
827 /**
Jordan Halterman356cda52018-06-15 17:46:28 -0700828 * Sends a message to the given node wrapped in a Lamport timestamp.
829 * <p>
830 * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
831 * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
832 *
833 * @param message the message to send
834 * @param subject the message subject
835 * @param toNodeId the node to which to send the message
836 * @param <M> the message type
837 * @param <R> the response type
838 * @return a future to be completed with the response
839 */
840 private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
841 return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
842 clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
843 .thenApply(response -> {
844 clock.tick(response.timestamp());
845 return response.value();
846 });
847 }
848
849 /**
850 * Receives messages to the given subject wrapped in Lamport timestamps.
851 * <p>
852 * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
853 * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
854 *
855 * @param subject the subject for which to register the subscriber
856 * @param function the raw message handler
857 * @param <M> the raw message type
858 * @param <R> the raw response type
859 */
860 private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
861 clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
862 clock.tick(request.timestamp());
863 return clock.timestamp(function.apply(request.value()));
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800864 }, SERIALIZER::encode, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700865 }
866
867 /**
868 * Registers internal message subscribers.
869 */
870 private void registerSubscribers() {
871 receiveWithTimestamp(getDigestsSubject, v -> getDigests());
872 receiveWithTimestamp(getBucketSubject, this::onGetBucket);
873 receiveWithTimestamp(backupSubject, this::onBackup);
Jordan Halterman9b49e342019-04-17 11:05:16 -0700874 clusterCommunicator.<BucketId, Set<FlowEntry>>addSubscriber(
875 getFlowsSubject, SERIALIZER::decode, this::getFlowEntries, SERIALIZER::encode);
Jordan Halterman356cda52018-06-15 17:46:28 -0700876 }
877
878 /**
879 * Unregisters internal message subscribers.
880 */
881 private void unregisterSubscribers() {
882 clusterCommunicator.removeSubscriber(getDigestsSubject);
883 clusterCommunicator.removeSubscriber(getBucketSubject);
884 clusterCommunicator.removeSubscriber(backupSubject);
Jordan Halterman9b49e342019-04-17 11:05:16 -0700885 clusterCommunicator.removeSubscriber(getFlowsSubject);
Jordan Halterman356cda52018-06-15 17:46:28 -0700886 }
887
888 /**
889 * Adds internal event listeners.
890 */
891 private void addListeners() {
892 lifecycleManager.addListener(lifecycleEventListener);
893 }
894
895 /**
896 * Removes internal event listeners.
897 */
898 private void removeListeners() {
899 lifecycleManager.removeListener(lifecycleEventListener);
900 }
901
902 /**
903 * Cancels recurrent scheduled futures.
904 */
905 private synchronized void cancelFutures() {
Jordan Halterman356cda52018-06-15 17:46:28 -0700906 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
907 if (antiEntropyFuture != null) {
908 antiEntropyFuture.cancel(false);
909 }
910 }
911
912 /**
913 * Closes the device flow table.
914 */
915 public void close() {
916 removeListeners();
917 unregisterSubscribers();
918 cancelFutures();
919 lifecycleManager.close();
920 }
921}