blob: 3987df015870da461146c6265a2fb2883ed22427 [file] [log] [blame]
Jordan Halterman356cda52018-06-15 17:46:28 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import java.util.Collection;
Jordan Halterman9b49e342019-04-17 11:05:16 -070019import java.util.Collections;
Jordan Halterman356cda52018-06-15 17:46:28 -070020import java.util.LinkedList;
21import java.util.List;
22import java.util.Map;
23import java.util.Queue;
24import java.util.Set;
25import java.util.concurrent.CompletableFuture;
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -080026import java.util.concurrent.Executor;
Jordan Halterman356cda52018-06-15 17:46:28 -070027import java.util.concurrent.ScheduledExecutorService;
28import java.util.concurrent.ScheduledFuture;
29import java.util.concurrent.TimeUnit;
30import java.util.function.BiFunction;
31import java.util.function.Function;
32import java.util.stream.Collectors;
33
Jordan Halterman9b49e342019-04-17 11:05:16 -070034import com.google.common.collect.Iterables;
Jordan Halterman356cda52018-06-15 17:46:28 -070035import com.google.common.collect.Maps;
36import com.google.common.collect.Sets;
37import org.onlab.util.KryoNamespace;
38import org.onlab.util.Tools;
39import org.onosproject.cluster.ClusterService;
40import org.onosproject.cluster.NodeId;
41import org.onosproject.net.DeviceId;
pier7aceddf2019-12-19 16:04:23 +010042import org.onosproject.net.device.DeviceService;
Jordan Halterman356cda52018-06-15 17:46:28 -070043import org.onosproject.net.flow.FlowEntry;
44import org.onosproject.net.flow.FlowId;
45import org.onosproject.net.flow.FlowRule;
46import org.onosproject.net.flow.StoredFlowEntry;
47import org.onosproject.store.LogicalTimestamp;
48import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
49import org.onosproject.store.cluster.messaging.MessageSubject;
50import org.onosproject.store.serializers.KryoNamespaces;
51import org.onosproject.store.service.Serializer;
52import org.slf4j.Logger;
53import org.slf4j.LoggerFactory;
54
55/**
56 * Flow table for all flows associated with a specific device.
57 * <p>
58 * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
59 * table performs communication independent of other device flow tables for more parallelism.
60 * <p>
61 * This implementation uses several different replication protocols. Changes that occur on the device master are
62 * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
63 * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
64 * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
65 * device, allowing mastership to be reassigned to non-backup nodes.
66 */
67public class DeviceFlowTable {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -080068 private static final int NUM_BUCKETS = 128;
Jordan Halterman356cda52018-06-15 17:46:28 -070069 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
70 .register(KryoNamespaces.API)
71 .register(BucketId.class)
72 .register(FlowBucket.class)
73 .register(FlowBucketDigest.class)
74 .register(LogicalTimestamp.class)
75 .register(Timestamped.class)
76 .build());
77
78 private final Logger log = LoggerFactory.getLogger(getClass());
79
80 private final MessageSubject getDigestsSubject;
81 private final MessageSubject getBucketSubject;
82 private final MessageSubject backupSubject;
Jordan Halterman9b49e342019-04-17 11:05:16 -070083 private final MessageSubject getFlowsSubject;
Jordan Halterman356cda52018-06-15 17:46:28 -070084
85 private final DeviceId deviceId;
86 private final ClusterCommunicationService clusterCommunicator;
pier7aceddf2019-12-19 16:04:23 +010087 private final DeviceService deviceService;
Jordan Halterman356cda52018-06-15 17:46:28 -070088 private final LifecycleManager lifecycleManager;
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -080089 private final ScheduledExecutorService scheduler;
90 private final Executor executor;
Jordan Halterman356cda52018-06-15 17:46:28 -070091 private final NodeId localNodeId;
92
93 private final LogicalClock clock = new LogicalClock();
94
95 private volatile DeviceReplicaInfo replicaInfo;
96 private volatile long activeTerm;
97
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -080098 private long backupPeriod;
99
Jordan Halterman356cda52018-06-15 17:46:28 -0700100 private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
101 @Override
102 public void event(LifecycleEvent event) {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800103 executor.execute(() -> onLifecycleEvent(event));
Jordan Halterman356cda52018-06-15 17:46:28 -0700104 }
105 };
106
Jordan Halterman356cda52018-06-15 17:46:28 -0700107 private ScheduledFuture<?> antiEntropyFuture;
108
109 private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
110 private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
111
112 private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
113 private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
114
115 DeviceFlowTable(
116 DeviceId deviceId,
117 ClusterService clusterService,
118 ClusterCommunicationService clusterCommunicator,
119 LifecycleManager lifecycleManager,
pier7aceddf2019-12-19 16:04:23 +0100120 DeviceService deviceService,
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800121 ScheduledExecutorService scheduler,
122 Executor executor,
Jordan Halterman356cda52018-06-15 17:46:28 -0700123 long backupPeriod,
124 long antiEntropyPeriod) {
125 this.deviceId = deviceId;
126 this.clusterCommunicator = clusterCommunicator;
127 this.lifecycleManager = lifecycleManager;
pier7aceddf2019-12-19 16:04:23 +0100128 this.deviceService = deviceService;
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800129 this.scheduler = scheduler;
130 this.executor = executor;
Jordan Halterman356cda52018-06-15 17:46:28 -0700131 this.localNodeId = clusterService.getLocalNode().id();
Jordan Halterman62022ef2019-03-08 10:48:22 -0800132 this.replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman356cda52018-06-15 17:46:28 -0700133
134 for (int i = 0; i < NUM_BUCKETS; i++) {
135 flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
136 }
137
138 getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
139 getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
140 backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
Jordan Halterman9b49e342019-04-17 11:05:16 -0700141 getFlowsSubject = new MessageSubject(String.format("flow-store-%s-flows", deviceId));
Jordan Halterman356cda52018-06-15 17:46:28 -0700142
Jordan Halterman62022ef2019-03-08 10:48:22 -0800143 addListeners();
144
Jordan Halterman356cda52018-06-15 17:46:28 -0700145 setBackupPeriod(backupPeriod);
146 setAntiEntropyPeriod(antiEntropyPeriod);
147 registerSubscribers();
148
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800149 scheduleBackups();
Jordan Halterman62022ef2019-03-08 10:48:22 -0800150
151 activateMaster(replicaInfo);
Jordan Halterman356cda52018-06-15 17:46:28 -0700152 }
153
154 /**
155 * Sets the flow table backup period.
156 *
157 * @param backupPeriod the flow table backup period in milliseconds
158 */
159 synchronized void setBackupPeriod(long backupPeriod) {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800160 this.backupPeriod = backupPeriod;
Jordan Halterman356cda52018-06-15 17:46:28 -0700161 }
162
163 /**
164 * Sets the flow table anti-entropy period.
165 *
166 * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
167 */
168 synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
169 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
170 if (antiEntropyFuture != null) {
171 antiEntropyFuture.cancel(false);
172 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800173 this.antiEntropyFuture = scheduler.scheduleAtFixedRate(
174 () -> executor.execute(this::runAntiEntropy),
175 antiEntropyPeriod,
176 antiEntropyPeriod,
177 TimeUnit.MILLISECONDS);
Jordan Halterman356cda52018-06-15 17:46:28 -0700178 }
179
180 /**
181 * Counts the flows in the table.
182 *
183 * @return the total number of flows in the table
184 */
185 public int count() {
186 return flowBuckets.values().stream()
187 .mapToInt(FlowBucket::count)
188 .sum();
189 }
190
191 /**
192 * Returns the flow entry for the given rule.
193 *
194 * @param rule the rule for which to lookup the flow entry
195 * @return the flow entry for the given rule
196 */
197 public StoredFlowEntry getFlowEntry(FlowRule rule) {
198 return getBucket(rule.id())
199 .getFlowEntries(rule.id())
200 .get(rule);
201 }
202
203 /**
204 * Returns the set of flow entries in the table.
205 *
206 * @return the set of flow entries in the table
207 */
Jordan Halterman9b49e342019-04-17 11:05:16 -0700208 public CompletableFuture<Iterable<FlowEntry>> getFlowEntries() {
209 // Fetch the entries for each bucket in parallel and then concatenate the sets
210 // to create a single iterable.
211 return Tools.allOf(flowBuckets.values()
212 .stream()
213 .map(this::getFlowEntries)
214 .collect(Collectors.toList()))
215 .thenApply(Iterables::concat);
216 }
217
218 /**
219 * Fetches the set of flow entries in the given bucket.
220 *
221 * @param bucketId the bucket for which to fetch flow entries
222 * @return a future to be completed once the flow entries have been retrieved
223 */
224 private CompletableFuture<Set<FlowEntry>> getFlowEntries(BucketId bucketId) {
225 return getFlowEntries(getBucket(bucketId.bucket()));
226 }
227
228 /**
229 * Fetches the set of flow entries in the given bucket.
230 *
231 * @param bucket the bucket for which to fetch flow entries
232 * @return a future to be completed once the flow entries have been retrieved
233 */
234 private CompletableFuture<Set<FlowEntry>> getFlowEntries(FlowBucket bucket) {
235 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
236
237 // If the local node is the master, fetch the entries locally. Otherwise, request the entries
238 // from the current master. Note that there's a change of a brief cycle during a mastership change.
239 if (replicaInfo.isMaster(localNodeId)) {
240 return CompletableFuture.completedFuture(
241 bucket.getFlowBucket().values().stream()
242 .flatMap(entries -> entries.values().stream())
243 .collect(Collectors.toSet()));
244 } else if (replicaInfo.master() != null) {
245 return clusterCommunicator.sendAndReceive(
246 bucket.bucketId(),
247 getFlowsSubject,
248 SERIALIZER::encode,
249 SERIALIZER::decode,
250 replicaInfo.master());
pier7aceddf2019-12-19 16:04:23 +0100251 } else if (deviceService.isAvailable(deviceId)) {
252 throw new RuntimeException("There is no master for available device " + deviceId);
Jordan Halterman9b49e342019-04-17 11:05:16 -0700253 } else {
254 return CompletableFuture.completedFuture(Collections.emptySet());
255 }
Jordan Halterman356cda52018-06-15 17:46:28 -0700256 }
257
258 /**
259 * Returns the bucket for the given flow identifier.
260 *
261 * @param flowId the flow identifier
262 * @return the bucket for the given flow identifier
263 */
264 private FlowBucket getBucket(FlowId flowId) {
265 return getBucket(bucket(flowId));
266 }
267
268 /**
269 * Returns the bucket with the given identifier.
270 *
271 * @param bucketId the bucket identifier
272 * @return the bucket with the given identifier
273 */
274 private FlowBucket getBucket(int bucketId) {
275 return flowBuckets.get(bucketId);
276 }
277
278 /**
279 * Returns the bucket number for the given flow identifier.
280 *
281 * @param flowId the flow identifier
282 * @return the bucket number for the given flow identifier
283 */
284 private int bucket(FlowId flowId) {
285 return Math.abs((int) (flowId.id() % NUM_BUCKETS));
286 }
287
288 /**
289 * Returns the digests for all buckets in the flow table for the device.
290 *
291 * @return the set of digests for all buckets for the device
292 */
293 private Set<FlowBucketDigest> getDigests() {
294 return flowBuckets.values()
295 .stream()
296 .map(bucket -> bucket.getDigest())
297 .collect(Collectors.toSet());
298 }
299
300 /**
301 * Returns the digest for the given bucket.
302 *
303 * @param bucket the bucket for which to return the digest
304 * @return the digest for the given bucket
305 */
306 private FlowBucketDigest getDigest(int bucket) {
307 return flowBuckets.get(bucket).getDigest();
308 }
309
310 /**
311 * Adds an entry to the table.
312 *
313 * @param rule the rule to add
314 * @return a future to be completed once the rule has been added
315 */
316 public CompletableFuture<Void> add(FlowEntry rule) {
317 return runInTerm(rule.id(), (bucket, term) -> {
318 bucket.add(rule, term, clock);
319 return null;
320 });
321 }
322
323 /**
324 * Updates an entry in the table.
325 *
326 * @param rule the rule to update
327 * @return a future to be completed once the rule has been updated
328 */
329 public CompletableFuture<Void> update(FlowEntry rule) {
330 return runInTerm(rule.id(), (bucket, term) -> {
331 bucket.update(rule, term, clock);
332 return null;
333 });
334 }
335
336 /**
337 * Applies the given update function to the rule.
338 *
339 * @param rule the rule to update
340 * @param function the update function to apply
341 * @param <T> the result type
342 * @return a future to be completed with the update result or {@code null} if the rule was not updated
343 */
344 public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
345 return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
346 }
347
348 /**
349 * Removes an entry from the table.
350 *
351 * @param rule the rule to remove
352 * @return a future to be completed once the rule has been removed
353 */
354 public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
355 return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
356 }
357
358 /**
359 * Runs the given function in the current term.
360 *
361 * @param flowId the flow identifier indicating the bucket in which to run the function
362 * @param function the function to execute in the current term
363 * @param <T> the future result type
364 * @return a future to be completed with the function result once it has been run
365 */
366 private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
Jordan Haltermanf57443e2019-03-11 13:30:12 -0700367 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
368 if (!replicaInfo.isMaster(localNodeId)) {
369 return Tools.exceptionalFuture(new IllegalStateException());
370 }
371
372 FlowBucket bucket = getBucket(flowId);
373
374 // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
375 // the change to be executed once the master has been synchronized.
376 final long term = replicaInfo.term();
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800377 CompletableFuture<T> future = new CompletableFuture<>();
Jordan Haltermanf57443e2019-03-11 13:30:12 -0700378 if (activeTerm < term) {
379 log.debug("Enqueueing operation for device {}", deviceId);
380 flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
381 .add(() -> future.complete(apply(function, bucket, term)));
382 } else {
383 future.complete(apply(function, bucket, term));
384 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800385 return future;
386 }
387
388 /**
Jordan Haltermanf57443e2019-03-11 13:30:12 -0700389 * Applies the given function to the given bucket.
390 *
391 * @param function the function to apply
392 * @param bucket the bucket to which to apply the function
393 * @param term the term in which to apply the function
394 * @param <T> the expected result type
395 * @return the function result
396 */
397 private <T> T apply(BiFunction<FlowBucket, Long, T> function, FlowBucket bucket, long term) {
398 synchronized (bucket) {
399 return function.apply(bucket, term);
400 }
401 }
402
403 /**
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800404 * Schedules bucket backups.
405 */
406 private void scheduleBackups() {
407 flowBuckets.values().forEach(bucket -> backupBucket(bucket).whenComplete((result, error) -> {
408 scheduleBackup(bucket);
409 }));
410 }
411
412 /**
413 * Schedules a backup for the given bucket.
414 *
415 * @param bucket the bucket for which to schedule the backup
416 */
417 private void scheduleBackup(FlowBucket bucket) {
418 scheduler.schedule(
419 () -> executor.execute(() -> backupBucket(bucket)),
420 backupPeriod,
421 TimeUnit.MILLISECONDS);
Jordan Halterman356cda52018-06-15 17:46:28 -0700422 }
423
424 /**
425 * Backs up all buckets in the given device to the given node.
426 */
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800427 private CompletableFuture<Void> backupAll() {
428 CompletableFuture<?>[] futures = flowBuckets.values()
429 .stream()
430 .map(bucket -> backupBucket(bucket))
431 .toArray(CompletableFuture[]::new);
432 return CompletableFuture.allOf(futures);
Jordan Halterman356cda52018-06-15 17:46:28 -0700433 }
434
435 /**
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800436 * Backs up the given flow bucket.
Jordan Halterman356cda52018-06-15 17:46:28 -0700437 *
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800438 * @param bucket the flow bucket to backup
Jordan Halterman356cda52018-06-15 17:46:28 -0700439 */
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800440 private CompletableFuture<Void> backupBucket(FlowBucket bucket) {
441 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman356cda52018-06-15 17:46:28 -0700442
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800443 // Only replicate if the bucket's term matches the replica term and the local node is the current master.
444 // This ensures that the bucket has been synchronized prior to a new master replicating changes to backups.
445 // Only replicate if the local node is the current master.
446 if (bucket.term() == replicaInfo.term() && replicaInfo.isMaster(localNodeId)) {
447 // Replicate the bucket to each of the backup nodes.
448 CompletableFuture<?>[] futures = replicaInfo.backups()
449 .stream()
450 .map(nodeId -> backupBucketToNode(bucket, nodeId))
451 .toArray(CompletableFuture[]::new);
452 return CompletableFuture.allOf(futures);
Jordan Halterman356cda52018-06-15 17:46:28 -0700453 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800454 return CompletableFuture.completedFuture(null);
455 }
456
457 /**
458 * Backs up the given flow bucket to the given node.
459 *
460 * @param bucket the bucket to backup
461 * @param nodeId the node to which to back up the bucket
462 * @return a future to be completed once the bucket has been backed up
463 */
464 private CompletableFuture<Void> backupBucketToNode(FlowBucket bucket, NodeId nodeId) {
465 // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
466 LogicalTimestamp timestamp = bucket.timestamp();
467
468 // If the backup can be run (no concurrent backup to the node in progress) then run it.
469 BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
470 if (startBackup(operation, timestamp)) {
471 CompletableFuture<Void> future = new CompletableFuture<>();
472 backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
473 if (error != null) {
474 log.debug("Backup operation {} failed", operation, error);
475 failBackup(operation);
476 } else if (succeeded) {
477 succeedBackup(operation, timestamp);
478 } else {
479 log.debug("Backup operation {} failed: term mismatch", operation);
480 failBackup(operation);
481 }
482 future.complete(null);
483 }, executor);
484 return future;
485 }
486 return CompletableFuture.completedFuture(null);
Jordan Halterman356cda52018-06-15 17:46:28 -0700487 }
488
489 /**
490 * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
491 * <p>
492 * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
493 * are pending replication for the backup operation.
494 *
495 * @param operation the operation to start
496 * @param timestamp the timestamp for which to start the backup operation
497 * @return indicates whether the given backup operation should be started
498 */
499 private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
500 LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
501 return timestamp != null
502 && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
503 && inFlightUpdates.add(operation);
504 }
505
506 /**
507 * Fails the given backup operation.
508 *
509 * @param operation the backup operation to fail
510 */
511 private void failBackup(BackupOperation operation) {
512 inFlightUpdates.remove(operation);
513 }
514
515 /**
516 * Succeeds the given backup operation.
517 * <p>
518 * The last backup time for the operation will be updated and the operation will be removed from
519 * in-flight updates.
520 *
521 * @param operation the operation to succeed
522 * @param timestamp the timestamp at which the operation was <em>started</em>
523 */
524 private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
525 lastBackupTimes.put(operation, timestamp);
526 inFlightUpdates.remove(operation);
527 }
528
529 /**
530 * Resets the last completion time for the given backup operation to ensure it's replicated again.
531 *
532 * @param operation the backup operation to reset
533 */
534 private void resetBackup(BackupOperation operation) {
535 lastBackupTimes.remove(operation);
536 }
537
538 /**
539 * Performs the given backup operation.
540 *
541 * @param bucket the bucket to backup
542 * @param nodeId the node to which to backup the bucket
543 * @return a future to be completed with a boolean indicating whether the backup operation was successful
544 */
545 private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
546 if (log.isDebugEnabled()) {
547 log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
548 }
Jordan Haltermanf57443e2019-03-11 13:30:12 -0700549 synchronized (bucket) {
550 return sendWithTimestamp(bucket, backupSubject, nodeId);
551 }
Jordan Halterman356cda52018-06-15 17:46:28 -0700552 }
553
554 /**
555 * Handles a flow bucket backup from a remote peer.
556 *
557 * @param flowBucket the flow bucket to back up
558 * @return the set of flows that could not be backed up
559 */
560 private boolean onBackup(FlowBucket flowBucket) {
561 if (log.isDebugEnabled()) {
562 log.debug("{} - Received {} flow entries in bucket {} to backup",
563 deviceId, flowBucket.count(), flowBucket.bucketId());
564 }
565
566 try {
567 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
568
569 // If the backup is for a different term, reject the request until we learn about the new term.
570 if (flowBucket.term() != replicaInfo.term()) {
571 log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
572 return false;
573 }
574
575 flowBuckets.compute(flowBucket.bucketId().bucket(),
576 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
577 return true;
578 } catch (Exception e) {
579 log.warn("Failure processing backup request", e);
580 return false;
581 }
582 }
583
584 /**
585 * Runs the anti-entropy protocol.
586 */
587 private void runAntiEntropy() {
588 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
589 if (!replicaInfo.isMaster(localNodeId)) {
590 return;
591 }
592
593 for (NodeId nodeId : replicaInfo.backups()) {
594 runAntiEntropy(nodeId);
595 }
596 }
597
598 /**
599 * Runs the anti-entropy protocol against the given peer.
600 *
601 * @param nodeId the node with which to execute the anti-entropy protocol
602 */
603 private void runAntiEntropy(NodeId nodeId) {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800604 backupAll().whenCompleteAsync((result, error) -> {
605 requestDigests(nodeId).thenAcceptAsync((digests) -> {
606 // Compute a set of missing BucketIds based on digest times and send them back to the master.
607 for (FlowBucketDigest remoteDigest : digests) {
608 FlowBucket localBucket = getBucket(remoteDigest.bucket());
609 if (localBucket.getDigest().isNewerThan(remoteDigest)) {
610 log.debug("Detected missing flow entries on node {} in bucket {}/{}",
611 nodeId, deviceId, remoteDigest.bucket());
612 resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
613 }
Jordan Halterman356cda52018-06-15 17:46:28 -0700614 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800615 }, executor);
616 }, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700617 }
618
619 /**
620 * Sends a digest request to the given node.
621 *
622 * @param nodeId the node to which to send the request
623 * @return future to be completed with the set of digests for the given device on the given node
624 */
625 private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
626 return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
627 }
628
629 /**
630 * Synchronizes flows from the previous master or backups.
631 *
632 * @param prevReplicaInfo the previous replica info
633 * @param newReplicaInfo the new replica info
634 */
635 private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
636 if (prevReplicaInfo == null) {
637 activateMaster(newReplicaInfo);
638 } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
639 syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
640 } else {
641 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
642 }
643 }
644
645 /**
646 * Synchronizes flows from the previous master, falling back to backups if the master fails.
647 *
648 * @param prevReplicaInfo the previous replica info
649 * @param newReplicaInfo the new replica info
650 */
651 private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
652 syncFlowsOn(prevReplicaInfo.master())
653 .whenCompleteAsync((result, error) -> {
654 if (error != null) {
655 log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
656 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
657 } else {
658 activateMaster(newReplicaInfo);
659 }
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800660 }, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700661 }
662
663 /**
664 * Synchronizes flows from the previous backups.
665 *
666 * @param prevReplicaInfo the previous replica info
667 * @param newReplicaInfo the new replica info
668 */
669 private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
670 List<NodeId> backups = prevReplicaInfo.backups()
671 .stream()
672 .filter(nodeId -> !nodeId.equals(localNodeId))
673 .collect(Collectors.toList());
674 syncFlowsOn(backups)
675 .whenCompleteAsync((result, error) -> {
676 if (error != null) {
677 log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
678 }
679 activateMaster(newReplicaInfo);
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800680 }, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700681 }
682
683 /**
684 * Synchronizes flows for the device on the given nodes.
685 *
686 * @param nodes the nodes via which to synchronize the flows
687 * @return a future to be completed once flows have been synchronizes
688 */
689 private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
690 return nodes.isEmpty()
691 ? CompletableFuture.completedFuture(null)
692 : Tools.firstOf(nodes.stream()
693 .map(node -> syncFlowsOn(node))
694 .collect(Collectors.toList()))
695 .thenApply(v -> null);
696 }
697
698 /**
699 * Synchronizes flows for the device from the given node.
700 *
701 * @param nodeId the node from which to synchronize flows
702 * @return a future to be completed once the flows have been synchronizes
703 */
704 private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
705 return requestDigests(nodeId)
706 .thenCompose(digests -> Tools.allOf(digests.stream()
707 .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
708 .map(digest -> syncBucketOn(nodeId, digest.bucket()))
709 .collect(Collectors.toList())))
710 .thenApply(v -> null);
711 }
712
713 /**
714 * Synchronizes the given bucket on the given node.
715 *
716 * @param nodeId the node on which to synchronize the bucket
717 * @param bucketNumber the bucket to synchronize
718 * @return a future to be completed once the bucket has been synchronizes
719 */
720 private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
721 return requestBucket(nodeId, bucketNumber)
722 .thenAcceptAsync(flowBucket -> {
723 flowBuckets.compute(flowBucket.bucketId().bucket(),
724 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800725 }, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700726 }
727
728 /**
729 * Requests the given bucket from the given node.
730 *
731 * @param nodeId the node from which to request the bucket
732 * @param bucket the bucket to request
733 * @return a future to be completed with the bucket
734 */
735 private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
736 log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
737 return sendWithTimestamp(bucket, getBucketSubject, nodeId);
738 }
739
740 /**
741 * Handles a flow bucket request.
742 *
Jordan Halterman4c3a0452018-06-30 23:44:30 -0700743 * @param bucketId the bucket number
Jordan Halterman356cda52018-06-15 17:46:28 -0700744 * @return the flow bucket
745 */
Jordan Halterman4c3a0452018-06-30 23:44:30 -0700746 private FlowBucket onGetBucket(int bucketId) {
747 return flowBuckets.get(bucketId).copy();
Jordan Halterman356cda52018-06-15 17:46:28 -0700748 }
749
750 /**
751 * Activates the new master term.
752 *
753 * @param replicaInfo the new replica info
754 */
755 private void activateMaster(DeviceReplicaInfo replicaInfo) {
Jordan Halterman62022ef2019-03-08 10:48:22 -0800756 if (replicaInfo.isMaster(localNodeId)) {
757 log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
758 for (int i = 0; i < NUM_BUCKETS; i++) {
759 activateBucket(i);
760 }
761 lifecycleManager.activate(replicaInfo.term());
762 activeTerm = replicaInfo.term();
Jordan Halterman356cda52018-06-15 17:46:28 -0700763 }
Jordan Halterman356cda52018-06-15 17:46:28 -0700764 }
765
766 /**
767 * Activates the given bucket number.
768 *
769 * @param bucket the bucket number to activate
770 */
771 private void activateBucket(int bucket) {
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800772 Queue<Runnable> tasks = flowTasks.remove(bucket);
Jordan Halterman356cda52018-06-15 17:46:28 -0700773 if (tasks != null) {
774 log.debug("Completing enqueued operations for device {}", deviceId);
775 tasks.forEach(task -> task.run());
776 }
777 }
778
779 /**
780 * Handles a lifecycle event.
781 */
782 private void onLifecycleEvent(LifecycleEvent event) {
783 log.debug("Received lifecycle event for device {}: {}", deviceId, event);
784 switch (event.type()) {
785 case TERM_START:
786 startTerm(event.subject());
787 break;
788 case TERM_ACTIVE:
789 activateTerm(event.subject());
790 break;
Jordan Halterman97cd2722018-06-22 16:58:08 -0700791 case TERM_UPDATE:
792 updateTerm(event.subject());
793 break;
Jordan Halterman356cda52018-06-15 17:46:28 -0700794 default:
795 break;
796 }
797 }
798
799 /**
800 * Handles a replica change at the start of a new term.
801 */
802 private void startTerm(DeviceReplicaInfo replicaInfo) {
803 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
804 this.replicaInfo = replicaInfo;
805 if (replicaInfo.isMaster(localNodeId)) {
806 log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
807 syncFlows(oldReplicaInfo, replicaInfo);
808 }
809 }
810
811 /**
812 * Handles the activation of a term.
813 */
814 private void activateTerm(DeviceReplicaInfo replicaInfo) {
815 if (replicaInfo.term() < this.replicaInfo.term()) {
816 return;
817 }
818 if (replicaInfo.term() > this.replicaInfo.term()) {
819 this.replicaInfo = replicaInfo;
820 }
821
822 // If the local node is neither the master or a backup for the device, clear the flow table.
823 if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId)) {
824 flowBuckets.values().forEach(bucket -> bucket.clear());
825 }
826 activeTerm = replicaInfo.term();
827 }
828
829 /**
Jordan Halterman97cd2722018-06-22 16:58:08 -0700830 * Handles an update to a term.
831 */
832 private void updateTerm(DeviceReplicaInfo replicaInfo) {
Jordan Haltermanf1240a92019-03-07 15:52:34 -0800833 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
834 if (oldReplicaInfo != null && replicaInfo.term() == oldReplicaInfo.term()) {
Jordan Halterman97cd2722018-06-22 16:58:08 -0700835 this.replicaInfo = replicaInfo;
836
837 // If the local node is neither the master or a backup for the device *and the term is active*,
838 // clear the flow table.
839 if (activeTerm == replicaInfo.term()
840 && !replicaInfo.isMaster(localNodeId)
841 && !replicaInfo.isBackup(localNodeId)) {
842 flowBuckets.values().forEach(bucket -> bucket.clear());
843 }
844 }
845 }
846
847 /**
Jordan Halterman356cda52018-06-15 17:46:28 -0700848 * Sends a message to the given node wrapped in a Lamport timestamp.
849 * <p>
850 * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
851 * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
852 *
853 * @param message the message to send
854 * @param subject the message subject
855 * @param toNodeId the node to which to send the message
856 * @param <M> the message type
857 * @param <R> the response type
858 * @return a future to be completed with the response
859 */
860 private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
861 return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
862 clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
863 .thenApply(response -> {
864 clock.tick(response.timestamp());
865 return response.value();
866 });
867 }
868
869 /**
870 * Receives messages to the given subject wrapped in Lamport timestamps.
871 * <p>
872 * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
873 * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
874 *
875 * @param subject the subject for which to register the subscriber
876 * @param function the raw message handler
877 * @param <M> the raw message type
878 * @param <R> the raw response type
879 */
880 private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
881 clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
882 clock.tick(request.timestamp());
883 return clock.timestamp(function.apply(request.value()));
Jordan Haltermanfc6d1fb2019-02-22 12:31:25 -0800884 }, SERIALIZER::encode, executor);
Jordan Halterman356cda52018-06-15 17:46:28 -0700885 }
886
887 /**
888 * Registers internal message subscribers.
889 */
890 private void registerSubscribers() {
891 receiveWithTimestamp(getDigestsSubject, v -> getDigests());
892 receiveWithTimestamp(getBucketSubject, this::onGetBucket);
893 receiveWithTimestamp(backupSubject, this::onBackup);
Jordan Halterman9b49e342019-04-17 11:05:16 -0700894 clusterCommunicator.<BucketId, Set<FlowEntry>>addSubscriber(
895 getFlowsSubject, SERIALIZER::decode, this::getFlowEntries, SERIALIZER::encode);
Jordan Halterman356cda52018-06-15 17:46:28 -0700896 }
897
898 /**
899 * Unregisters internal message subscribers.
900 */
901 private void unregisterSubscribers() {
902 clusterCommunicator.removeSubscriber(getDigestsSubject);
903 clusterCommunicator.removeSubscriber(getBucketSubject);
904 clusterCommunicator.removeSubscriber(backupSubject);
Jordan Halterman9b49e342019-04-17 11:05:16 -0700905 clusterCommunicator.removeSubscriber(getFlowsSubject);
Jordan Halterman356cda52018-06-15 17:46:28 -0700906 }
907
908 /**
909 * Adds internal event listeners.
910 */
911 private void addListeners() {
912 lifecycleManager.addListener(lifecycleEventListener);
913 }
914
915 /**
916 * Removes internal event listeners.
917 */
918 private void removeListeners() {
919 lifecycleManager.removeListener(lifecycleEventListener);
920 }
921
922 /**
923 * Cancels recurrent scheduled futures.
924 */
925 private synchronized void cancelFutures() {
Jordan Halterman356cda52018-06-15 17:46:28 -0700926 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
927 if (antiEntropyFuture != null) {
928 antiEntropyFuture.cancel(false);
929 }
930 }
931
932 /**
933 * Closes the device flow table.
934 */
935 public void close() {
936 removeListeners();
937 unregisterSubscribers();
938 cancelFutures();
939 lifecycleManager.close();
940 }
941}