blob: c438885d5d780685440cef4a7f0c0f16c7c3fdfd [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
Jordan Halterman43300782019-05-21 11:27:50 -070018import java.time.Duration;
Jordan Halterman281dbf32018-06-15 17:46:28 -070019import java.util.Collection;
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070020import java.util.Collections;
Jordan Halterman281dbf32018-06-15 17:46:28 -070021import java.util.LinkedList;
22import java.util.List;
23import java.util.Map;
24import java.util.Queue;
25import java.util.Set;
26import java.util.concurrent.CompletableFuture;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080027import java.util.concurrent.Executor;
Jordan Halterman281dbf32018-06-15 17:46:28 -070028import java.util.concurrent.ScheduledExecutorService;
29import java.util.concurrent.ScheduledFuture;
30import java.util.concurrent.TimeUnit;
31import java.util.function.BiFunction;
32import java.util.function.Function;
33import java.util.stream.Collectors;
34
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070035import com.google.common.collect.Iterables;
Jordan Halterman281dbf32018-06-15 17:46:28 -070036import com.google.common.collect.Maps;
37import com.google.common.collect.Sets;
38import org.onlab.util.KryoNamespace;
39import org.onlab.util.Tools;
40import org.onosproject.cluster.ClusterService;
41import org.onosproject.cluster.NodeId;
42import org.onosproject.net.DeviceId;
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +000043import org.onosproject.net.device.DeviceService;
Jordan Halterman281dbf32018-06-15 17:46:28 -070044import org.onosproject.net.flow.FlowEntry;
45import org.onosproject.net.flow.FlowId;
46import org.onosproject.net.flow.FlowRule;
47import org.onosproject.net.flow.StoredFlowEntry;
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +000048import org.onosproject.net.flow.FlowRuleStoreException;
Jordan Halterman281dbf32018-06-15 17:46:28 -070049import org.onosproject.store.LogicalTimestamp;
50import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
51import org.onosproject.store.cluster.messaging.MessageSubject;
52import org.onosproject.store.serializers.KryoNamespaces;
53import org.onosproject.store.service.Serializer;
54import org.slf4j.Logger;
55import org.slf4j.LoggerFactory;
56
57/**
58 * Flow table for all flows associated with a specific device.
59 * <p>
60 * Flows in the table are stored in buckets. Each bucket is mutated and replicated as a single unit. The device flow
61 * table performs communication independent of other device flow tables for more parallelism.
62 * <p>
63 * This implementation uses several different replication protocols. Changes that occur on the device master are
64 * replicated to the backups provided in the {@link DeviceReplicaInfo} for the master's term. Additionally, a periodic
65 * anti-entropy protocol is used to detect missing flows on backups (e.g. due to a node restart). Finally, when a
66 * device mastership change occurs, the new master synchronizes flows with the prior master and/or backups for the
67 * device, allowing mastership to be reassigned to non-backup nodes.
68 */
69public class DeviceFlowTable {
Jordan Haltermanaeda2752019-02-22 12:31:25 -080070 private static final int NUM_BUCKETS = 128;
Jordan Halterman281dbf32018-06-15 17:46:28 -070071 private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
72 .register(KryoNamespaces.API)
73 .register(BucketId.class)
74 .register(FlowBucket.class)
75 .register(FlowBucketDigest.class)
76 .register(LogicalTimestamp.class)
77 .register(Timestamped.class)
78 .build());
Jordan Halterman43300782019-05-21 11:27:50 -070079 private static final int GET_FLOW_ENTRIES_TIMEOUT = 15; // seconds
Jordan Halterman281dbf32018-06-15 17:46:28 -070080
81 private final Logger log = LoggerFactory.getLogger(getClass());
82
83 private final MessageSubject getDigestsSubject;
84 private final MessageSubject getBucketSubject;
85 private final MessageSubject backupSubject;
Jordan Halterman01f3bdd2019-04-17 11:05:16 -070086 private final MessageSubject getFlowsSubject;
Jordan Halterman281dbf32018-06-15 17:46:28 -070087
88 private final DeviceId deviceId;
89 private final ClusterCommunicationService clusterCommunicator;
Andrea Campanella5daa7c462020-03-13 12:04:23 +010090 private final ClusterService clusterService;
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +000091 private final DeviceService deviceService;
Jordan Halterman281dbf32018-06-15 17:46:28 -070092 private final LifecycleManager lifecycleManager;
Jordan Haltermanaeda2752019-02-22 12:31:25 -080093 private final ScheduledExecutorService scheduler;
94 private final Executor executor;
Jordan Halterman281dbf32018-06-15 17:46:28 -070095 private final NodeId localNodeId;
96
97 private final LogicalClock clock = new LogicalClock();
98
99 private volatile DeviceReplicaInfo replicaInfo;
100 private volatile long activeTerm;
101
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800102 private long backupPeriod;
103
Jordan Halterman281dbf32018-06-15 17:46:28 -0700104 private final LifecycleEventListener lifecycleEventListener = new LifecycleEventListener() {
105 @Override
106 public void event(LifecycleEvent event) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800107 executor.execute(() -> onLifecycleEvent(event));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700108 }
109 };
110
Jordan Halterman281dbf32018-06-15 17:46:28 -0700111 private ScheduledFuture<?> antiEntropyFuture;
112
113 private final Map<Integer, Queue<Runnable>> flowTasks = Maps.newConcurrentMap();
114 private final Map<Integer, FlowBucket> flowBuckets = Maps.newConcurrentMap();
115
116 private final Map<BackupOperation, LogicalTimestamp> lastBackupTimes = Maps.newConcurrentMap();
117 private final Set<BackupOperation> inFlightUpdates = Sets.newConcurrentHashSet();
118
119 DeviceFlowTable(
120 DeviceId deviceId,
121 ClusterService clusterService,
122 ClusterCommunicationService clusterCommunicator,
123 LifecycleManager lifecycleManager,
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000124 DeviceService deviceService,
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800125 ScheduledExecutorService scheduler,
126 Executor executor,
Jordan Halterman281dbf32018-06-15 17:46:28 -0700127 long backupPeriod,
128 long antiEntropyPeriod) {
129 this.deviceId = deviceId;
130 this.clusterCommunicator = clusterCommunicator;
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100131 this.clusterService = clusterService;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700132 this.lifecycleManager = lifecycleManager;
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000133 this.deviceService = deviceService;
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800134 this.scheduler = scheduler;
135 this.executor = executor;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700136 this.localNodeId = clusterService.getLocalNode().id();
Jordan Haltermane3de3212019-03-08 10:48:22 -0800137 this.replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700138
139 for (int i = 0; i < NUM_BUCKETS; i++) {
140 flowBuckets.put(i, new FlowBucket(new BucketId(deviceId, i)));
141 }
142
143 getDigestsSubject = new MessageSubject(String.format("flow-store-%s-digests", deviceId));
144 getBucketSubject = new MessageSubject(String.format("flow-store-%s-bucket", deviceId));
145 backupSubject = new MessageSubject(String.format("flow-store-%s-backup", deviceId));
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700146 getFlowsSubject = new MessageSubject(String.format("flow-store-%s-flows", deviceId));
Jordan Halterman281dbf32018-06-15 17:46:28 -0700147
Jordan Haltermane3de3212019-03-08 10:48:22 -0800148 addListeners();
149
Jordan Halterman281dbf32018-06-15 17:46:28 -0700150 setBackupPeriod(backupPeriod);
151 setAntiEntropyPeriod(antiEntropyPeriod);
152 registerSubscribers();
153
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800154 scheduleBackups();
Jordan Haltermane3de3212019-03-08 10:48:22 -0800155
156 activateMaster(replicaInfo);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700157 }
158
159 /**
160 * Sets the flow table backup period.
161 *
162 * @param backupPeriod the flow table backup period in milliseconds
163 */
164 synchronized void setBackupPeriod(long backupPeriod) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800165 this.backupPeriod = backupPeriod;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700166 }
167
168 /**
169 * Sets the flow table anti-entropy period.
170 *
171 * @param antiEntropyPeriod the flow table anti-entropy period in milliseconds
172 */
173 synchronized void setAntiEntropyPeriod(long antiEntropyPeriod) {
174 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
175 if (antiEntropyFuture != null) {
176 antiEntropyFuture.cancel(false);
177 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800178 this.antiEntropyFuture = scheduler.scheduleAtFixedRate(
179 () -> executor.execute(this::runAntiEntropy),
180 antiEntropyPeriod,
181 antiEntropyPeriod,
182 TimeUnit.MILLISECONDS);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700183 }
184
185 /**
186 * Counts the flows in the table.
187 *
188 * @return the total number of flows in the table
189 */
190 public int count() {
191 return flowBuckets.values().stream()
192 .mapToInt(FlowBucket::count)
193 .sum();
194 }
195
196 /**
197 * Returns the flow entry for the given rule.
198 *
199 * @param rule the rule for which to lookup the flow entry
200 * @return the flow entry for the given rule
201 */
202 public StoredFlowEntry getFlowEntry(FlowRule rule) {
203 return getBucket(rule.id())
204 .getFlowEntries(rule.id())
205 .get(rule);
206 }
207
208 /**
209 * Returns the set of flow entries in the table.
210 *
211 * @return the set of flow entries in the table
212 */
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700213 public CompletableFuture<Iterable<FlowEntry>> getFlowEntries() {
214 // Fetch the entries for each bucket in parallel and then concatenate the sets
215 // to create a single iterable.
216 return Tools.allOf(flowBuckets.values()
217 .stream()
218 .map(this::getFlowEntries)
219 .collect(Collectors.toList()))
220 .thenApply(Iterables::concat);
221 }
222
223 /**
224 * Fetches the set of flow entries in the given bucket.
225 *
226 * @param bucketId the bucket for which to fetch flow entries
227 * @return a future to be completed once the flow entries have been retrieved
228 */
229 private CompletableFuture<Set<FlowEntry>> getFlowEntries(BucketId bucketId) {
230 return getFlowEntries(getBucket(bucketId.bucket()));
231 }
232
233 /**
234 * Fetches the set of flow entries in the given bucket.
235 *
236 * @param bucket the bucket for which to fetch flow entries
237 * @return a future to be completed once the flow entries have been retrieved
238 */
239 private CompletableFuture<Set<FlowEntry>> getFlowEntries(FlowBucket bucket) {
240 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700241 // If the local node is the master, fetch the entries locally. Otherwise, request the entries
242 // from the current master. Note that there's a change of a brief cycle during a mastership change.
243 if (replicaInfo.isMaster(localNodeId)) {
244 return CompletableFuture.completedFuture(
245 bucket.getFlowBucket().values().stream()
246 .flatMap(entries -> entries.values().stream())
247 .collect(Collectors.toSet()));
248 } else if (replicaInfo.master() != null) {
249 return clusterCommunicator.sendAndReceive(
250 bucket.bucketId(),
251 getFlowsSubject,
252 SERIALIZER::encode,
253 SERIALIZER::decode,
Jordan Halterman43300782019-05-21 11:27:50 -0700254 replicaInfo.master(),
255 Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
Pier Luigi Ventred8a923c2020-02-20 11:25:31 +0000256 } else if (deviceService.isAvailable(deviceId)) {
257 throw new FlowRuleStoreException("There is no master for available device " + deviceId);
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100258 } else if (clusterService.getNodes().size() <= 1 + ECFlowRuleStore.backupCount) {
259 //TODO remove this check when [ONOS-8080] is fixed
260 //When device is not available and has no master and
261 // the number of nodes surpasses the guaranteed backup count,
262 // we are certain that this node has a replica.
263 // -- DISCLAIMER --
264 // You manually need to set the backup count for clusters > 3 nodes,
265 // the default is 2, which handles the single instance and 3 node scenarios
266 return CompletableFuture.completedFuture(
267 bucket.getFlowBucket().values().stream()
268 .flatMap(entries -> entries.values().stream())
269 .collect(Collectors.toSet()));
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700270 } else {
271 return CompletableFuture.completedFuture(Collections.emptySet());
272 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700273 }
274
275 /**
276 * Returns the bucket for the given flow identifier.
277 *
278 * @param flowId the flow identifier
279 * @return the bucket for the given flow identifier
280 */
281 private FlowBucket getBucket(FlowId flowId) {
282 return getBucket(bucket(flowId));
283 }
284
285 /**
286 * Returns the bucket with the given identifier.
287 *
288 * @param bucketId the bucket identifier
289 * @return the bucket with the given identifier
290 */
291 private FlowBucket getBucket(int bucketId) {
292 return flowBuckets.get(bucketId);
293 }
294
295 /**
296 * Returns the bucket number for the given flow identifier.
297 *
298 * @param flowId the flow identifier
299 * @return the bucket number for the given flow identifier
300 */
301 private int bucket(FlowId flowId) {
302 return Math.abs((int) (flowId.id() % NUM_BUCKETS));
303 }
304
305 /**
306 * Returns the digests for all buckets in the flow table for the device.
307 *
308 * @return the set of digests for all buckets for the device
309 */
310 private Set<FlowBucketDigest> getDigests() {
311 return flowBuckets.values()
312 .stream()
313 .map(bucket -> bucket.getDigest())
314 .collect(Collectors.toSet());
315 }
316
317 /**
318 * Returns the digest for the given bucket.
319 *
320 * @param bucket the bucket for which to return the digest
321 * @return the digest for the given bucket
322 */
323 private FlowBucketDigest getDigest(int bucket) {
324 return flowBuckets.get(bucket).getDigest();
325 }
326
327 /**
328 * Adds an entry to the table.
329 *
330 * @param rule the rule to add
331 * @return a future to be completed once the rule has been added
332 */
333 public CompletableFuture<Void> add(FlowEntry rule) {
334 return runInTerm(rule.id(), (bucket, term) -> {
335 bucket.add(rule, term, clock);
336 return null;
337 });
338 }
339
340 /**
341 * Updates an entry in the table.
342 *
343 * @param rule the rule to update
344 * @return a future to be completed once the rule has been updated
345 */
346 public CompletableFuture<Void> update(FlowEntry rule) {
347 return runInTerm(rule.id(), (bucket, term) -> {
348 bucket.update(rule, term, clock);
349 return null;
350 });
351 }
352
353 /**
354 * Applies the given update function to the rule.
355 *
356 * @param rule the rule to update
357 * @param function the update function to apply
358 * @param <T> the result type
359 * @return a future to be completed with the update result or {@code null} if the rule was not updated
360 */
361 public <T> CompletableFuture<T> update(FlowRule rule, Function<StoredFlowEntry, T> function) {
362 return runInTerm(rule.id(), (bucket, term) -> bucket.update(rule, function, term, clock));
363 }
364
365 /**
366 * Removes an entry from the table.
367 *
368 * @param rule the rule to remove
369 * @return a future to be completed once the rule has been removed
370 */
371 public CompletableFuture<FlowEntry> remove(FlowEntry rule) {
372 return runInTerm(rule.id(), (bucket, term) -> bucket.remove(rule, term, clock));
373 }
374
375 /**
376 * Runs the given function in the current term.
377 *
378 * @param flowId the flow identifier indicating the bucket in which to run the function
379 * @param function the function to execute in the current term
380 * @param <T> the future result type
381 * @return a future to be completed with the function result once it has been run
382 */
383 private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
Jordan Halterman07093b02019-03-11 13:30:12 -0700384 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
385 if (!replicaInfo.isMaster(localNodeId)) {
386 return Tools.exceptionalFuture(new IllegalStateException());
387 }
388
389 FlowBucket bucket = getBucket(flowId);
390
391 // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
392 // the change to be executed once the master has been synchronized.
393 final long term = replicaInfo.term();
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800394 CompletableFuture<T> future = new CompletableFuture<>();
Jordan Halterman07093b02019-03-11 13:30:12 -0700395 if (activeTerm < term) {
396 log.debug("Enqueueing operation for device {}", deviceId);
397 flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
398 .add(() -> future.complete(apply(function, bucket, term)));
399 } else {
400 future.complete(apply(function, bucket, term));
401 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800402 return future;
403 }
404
405 /**
Jordan Halterman07093b02019-03-11 13:30:12 -0700406 * Applies the given function to the given bucket.
407 *
408 * @param function the function to apply
409 * @param bucket the bucket to which to apply the function
410 * @param term the term in which to apply the function
411 * @param <T> the expected result type
412 * @return the function result
413 */
414 private <T> T apply(BiFunction<FlowBucket, Long, T> function, FlowBucket bucket, long term) {
415 synchronized (bucket) {
416 return function.apply(bucket, term);
417 }
418 }
419
420 /**
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800421 * Schedules bucket backups.
422 */
423 private void scheduleBackups() {
424 flowBuckets.values().forEach(bucket -> backupBucket(bucket).whenComplete((result, error) -> {
425 scheduleBackup(bucket);
426 }));
427 }
428
429 /**
430 * Schedules a backup for the given bucket.
431 *
432 * @param bucket the bucket for which to schedule the backup
433 */
434 private void scheduleBackup(FlowBucket bucket) {
435 scheduler.schedule(
436 () -> executor.execute(() -> backupBucket(bucket)),
437 backupPeriod,
438 TimeUnit.MILLISECONDS);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700439 }
440
441 /**
442 * Backs up all buckets in the given device to the given node.
443 */
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800444 private CompletableFuture<Void> backupAll() {
445 CompletableFuture<?>[] futures = flowBuckets.values()
446 .stream()
447 .map(bucket -> backupBucket(bucket))
448 .toArray(CompletableFuture[]::new);
449 return CompletableFuture.allOf(futures);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700450 }
451
452 /**
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800453 * Backs up the given flow bucket.
Jordan Halterman281dbf32018-06-15 17:46:28 -0700454 *
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800455 * @param bucket the flow bucket to backup
Jordan Halterman281dbf32018-06-15 17:46:28 -0700456 */
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800457 private CompletableFuture<Void> backupBucket(FlowBucket bucket) {
458 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700459
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800460 // Only replicate if the bucket's term matches the replica term and the local node is the current master.
461 // This ensures that the bucket has been synchronized prior to a new master replicating changes to backups.
462 // Only replicate if the local node is the current master.
463 if (bucket.term() == replicaInfo.term() && replicaInfo.isMaster(localNodeId)) {
464 // Replicate the bucket to each of the backup nodes.
465 CompletableFuture<?>[] futures = replicaInfo.backups()
466 .stream()
467 .map(nodeId -> backupBucketToNode(bucket, nodeId))
468 .toArray(CompletableFuture[]::new);
469 return CompletableFuture.allOf(futures);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700470 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800471 return CompletableFuture.completedFuture(null);
472 }
473
474 /**
475 * Backs up the given flow bucket to the given node.
476 *
477 * @param bucket the bucket to backup
478 * @param nodeId the node to which to back up the bucket
479 * @return a future to be completed once the bucket has been backed up
480 */
481 private CompletableFuture<Void> backupBucketToNode(FlowBucket bucket, NodeId nodeId) {
482 // Record the logical timestamp from the bucket to keep track of the highest logical time replicated.
483 LogicalTimestamp timestamp = bucket.timestamp();
484
485 // If the backup can be run (no concurrent backup to the node in progress) then run it.
486 BackupOperation operation = new BackupOperation(nodeId, bucket.bucketId().bucket());
487 if (startBackup(operation, timestamp)) {
488 CompletableFuture<Void> future = new CompletableFuture<>();
489 backup(bucket, nodeId).whenCompleteAsync((succeeded, error) -> {
490 if (error != null) {
491 log.debug("Backup operation {} failed", operation, error);
492 failBackup(operation);
493 } else if (succeeded) {
494 succeedBackup(operation, timestamp);
495 } else {
496 log.debug("Backup operation {} failed: term mismatch", operation);
497 failBackup(operation);
498 }
499 future.complete(null);
500 }, executor);
501 return future;
502 }
503 return CompletableFuture.completedFuture(null);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700504 }
505
506 /**
507 * Returns a boolean indicating whether the given {@link BackupOperation} can be started.
508 * <p>
509 * The backup can be started if no backup for the same device/bucket/node is already in progress and changes
510 * are pending replication for the backup operation.
511 *
512 * @param operation the operation to start
513 * @param timestamp the timestamp for which to start the backup operation
514 * @return indicates whether the given backup operation should be started
515 */
516 private boolean startBackup(BackupOperation operation, LogicalTimestamp timestamp) {
517 LogicalTimestamp lastBackupTime = lastBackupTimes.get(operation);
518 return timestamp != null
519 && (lastBackupTime == null || lastBackupTime.isOlderThan(timestamp))
520 && inFlightUpdates.add(operation);
521 }
522
523 /**
524 * Fails the given backup operation.
525 *
526 * @param operation the backup operation to fail
527 */
528 private void failBackup(BackupOperation operation) {
529 inFlightUpdates.remove(operation);
530 }
531
532 /**
533 * Succeeds the given backup operation.
534 * <p>
535 * The last backup time for the operation will be updated and the operation will be removed from
536 * in-flight updates.
537 *
538 * @param operation the operation to succeed
539 * @param timestamp the timestamp at which the operation was <em>started</em>
540 */
541 private void succeedBackup(BackupOperation operation, LogicalTimestamp timestamp) {
542 lastBackupTimes.put(operation, timestamp);
543 inFlightUpdates.remove(operation);
544 }
545
546 /**
547 * Resets the last completion time for the given backup operation to ensure it's replicated again.
548 *
549 * @param operation the backup operation to reset
550 */
551 private void resetBackup(BackupOperation operation) {
552 lastBackupTimes.remove(operation);
553 }
554
555 /**
556 * Performs the given backup operation.
557 *
558 * @param bucket the bucket to backup
559 * @param nodeId the node to which to backup the bucket
560 * @return a future to be completed with a boolean indicating whether the backup operation was successful
561 */
562 private CompletableFuture<Boolean> backup(FlowBucket bucket, NodeId nodeId) {
563 if (log.isDebugEnabled()) {
564 log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
565 }
Jordan Halterman07093b02019-03-11 13:30:12 -0700566 synchronized (bucket) {
567 return sendWithTimestamp(bucket, backupSubject, nodeId);
568 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700569 }
570
571 /**
572 * Handles a flow bucket backup from a remote peer.
573 *
574 * @param flowBucket the flow bucket to back up
575 * @return the set of flows that could not be backed up
576 */
577 private boolean onBackup(FlowBucket flowBucket) {
578 if (log.isDebugEnabled()) {
579 log.debug("{} - Received {} flow entries in bucket {} to backup",
580 deviceId, flowBucket.count(), flowBucket.bucketId());
581 }
582
583 try {
584 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
585
586 // If the backup is for a different term, reject the request until we learn about the new term.
587 if (flowBucket.term() != replicaInfo.term()) {
588 log.debug("Term mismatch for device {}: {} != {}", deviceId, flowBucket.term(), replicaInfo);
589 return false;
590 }
591
592 flowBuckets.compute(flowBucket.bucketId().bucket(),
593 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
594 return true;
595 } catch (Exception e) {
596 log.warn("Failure processing backup request", e);
597 return false;
598 }
599 }
600
601 /**
602 * Runs the anti-entropy protocol.
603 */
604 private void runAntiEntropy() {
605 DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
606 if (!replicaInfo.isMaster(localNodeId)) {
607 return;
608 }
609
610 for (NodeId nodeId : replicaInfo.backups()) {
611 runAntiEntropy(nodeId);
612 }
613 }
614
615 /**
616 * Runs the anti-entropy protocol against the given peer.
617 *
618 * @param nodeId the node with which to execute the anti-entropy protocol
619 */
620 private void runAntiEntropy(NodeId nodeId) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800621 backupAll().whenCompleteAsync((result, error) -> {
622 requestDigests(nodeId).thenAcceptAsync((digests) -> {
623 // Compute a set of missing BucketIds based on digest times and send them back to the master.
624 for (FlowBucketDigest remoteDigest : digests) {
625 FlowBucket localBucket = getBucket(remoteDigest.bucket());
626 if (localBucket.getDigest().isNewerThan(remoteDigest)) {
627 log.debug("Detected missing flow entries on node {} in bucket {}/{}",
628 nodeId, deviceId, remoteDigest.bucket());
629 resetBackup(new BackupOperation(nodeId, remoteDigest.bucket()));
630 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700631 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800632 }, executor);
633 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700634 }
635
636 /**
637 * Sends a digest request to the given node.
638 *
639 * @param nodeId the node to which to send the request
640 * @return future to be completed with the set of digests for the given device on the given node
641 */
642 private CompletableFuture<Set<FlowBucketDigest>> requestDigests(NodeId nodeId) {
643 return sendWithTimestamp(deviceId, getDigestsSubject, nodeId);
644 }
645
646 /**
647 * Synchronizes flows from the previous master or backups.
648 *
649 * @param prevReplicaInfo the previous replica info
650 * @param newReplicaInfo the new replica info
651 */
652 private void syncFlows(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
653 if (prevReplicaInfo == null) {
654 activateMaster(newReplicaInfo);
655 } else if (prevReplicaInfo.master() != null && !prevReplicaInfo.master().equals(localNodeId)) {
656 syncFlowsOnMaster(prevReplicaInfo, newReplicaInfo);
657 } else {
658 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
659 }
660 }
661
662 /**
663 * Synchronizes flows from the previous master, falling back to backups if the master fails.
664 *
665 * @param prevReplicaInfo the previous replica info
666 * @param newReplicaInfo the new replica info
667 */
668 private void syncFlowsOnMaster(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
669 syncFlowsOn(prevReplicaInfo.master())
670 .whenCompleteAsync((result, error) -> {
671 if (error != null) {
672 log.debug("Failed to synchronize flows on previous master {}", prevReplicaInfo.master(), error);
673 syncFlowsOnBackups(prevReplicaInfo, newReplicaInfo);
674 } else {
675 activateMaster(newReplicaInfo);
676 }
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800677 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700678 }
679
680 /**
681 * Synchronizes flows from the previous backups.
682 *
683 * @param prevReplicaInfo the previous replica info
684 * @param newReplicaInfo the new replica info
685 */
686 private void syncFlowsOnBackups(DeviceReplicaInfo prevReplicaInfo, DeviceReplicaInfo newReplicaInfo) {
687 List<NodeId> backups = prevReplicaInfo.backups()
688 .stream()
689 .filter(nodeId -> !nodeId.equals(localNodeId))
690 .collect(Collectors.toList());
691 syncFlowsOn(backups)
692 .whenCompleteAsync((result, error) -> {
693 if (error != null) {
694 log.debug("Failed to synchronize flows on previous backup nodes {}", backups, error);
695 }
696 activateMaster(newReplicaInfo);
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800697 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700698 }
699
700 /**
701 * Synchronizes flows for the device on the given nodes.
702 *
703 * @param nodes the nodes via which to synchronize the flows
704 * @return a future to be completed once flows have been synchronizes
705 */
706 private CompletableFuture<Void> syncFlowsOn(Collection<NodeId> nodes) {
707 return nodes.isEmpty()
708 ? CompletableFuture.completedFuture(null)
709 : Tools.firstOf(nodes.stream()
710 .map(node -> syncFlowsOn(node))
711 .collect(Collectors.toList()))
712 .thenApply(v -> null);
713 }
714
715 /**
716 * Synchronizes flows for the device from the given node.
717 *
718 * @param nodeId the node from which to synchronize flows
719 * @return a future to be completed once the flows have been synchronizes
720 */
721 private CompletableFuture<Void> syncFlowsOn(NodeId nodeId) {
722 return requestDigests(nodeId)
723 .thenCompose(digests -> Tools.allOf(digests.stream()
724 .filter(digest -> digest.isNewerThan(getDigest(digest.bucket())))
725 .map(digest -> syncBucketOn(nodeId, digest.bucket()))
726 .collect(Collectors.toList())))
727 .thenApply(v -> null);
728 }
729
730 /**
731 * Synchronizes the given bucket on the given node.
732 *
733 * @param nodeId the node on which to synchronize the bucket
734 * @param bucketNumber the bucket to synchronize
735 * @return a future to be completed once the bucket has been synchronizes
736 */
737 private CompletableFuture<Void> syncBucketOn(NodeId nodeId, int bucketNumber) {
738 return requestBucket(nodeId, bucketNumber)
739 .thenAcceptAsync(flowBucket -> {
740 flowBuckets.compute(flowBucket.bucketId().bucket(),
741 (id, bucket) -> flowBucket.getDigest().isNewerThan(bucket.getDigest()) ? flowBucket : bucket);
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800742 }, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700743 }
744
745 /**
746 * Requests the given bucket from the given node.
747 *
748 * @param nodeId the node from which to request the bucket
749 * @param bucket the bucket to request
750 * @return a future to be completed with the bucket
751 */
752 private CompletableFuture<FlowBucket> requestBucket(NodeId nodeId, int bucket) {
753 log.debug("Requesting flow bucket {} from {}", bucket, nodeId);
754 return sendWithTimestamp(bucket, getBucketSubject, nodeId);
755 }
756
757 /**
758 * Handles a flow bucket request.
759 *
Jordan Halterman158feb92018-06-30 23:44:30 -0700760 * @param bucketId the bucket number
Jordan Halterman281dbf32018-06-15 17:46:28 -0700761 * @return the flow bucket
762 */
Jordan Halterman158feb92018-06-30 23:44:30 -0700763 private FlowBucket onGetBucket(int bucketId) {
764 return flowBuckets.get(bucketId).copy();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700765 }
766
767 /**
768 * Activates the new master term.
769 *
770 * @param replicaInfo the new replica info
771 */
772 private void activateMaster(DeviceReplicaInfo replicaInfo) {
Jordan Haltermane3de3212019-03-08 10:48:22 -0800773 if (replicaInfo.isMaster(localNodeId)) {
774 log.debug("Activating term {} for device {}", replicaInfo.term(), deviceId);
775 for (int i = 0; i < NUM_BUCKETS; i++) {
776 activateBucket(i);
777 }
778 lifecycleManager.activate(replicaInfo.term());
779 activeTerm = replicaInfo.term();
Jordan Halterman281dbf32018-06-15 17:46:28 -0700780 }
Jordan Halterman281dbf32018-06-15 17:46:28 -0700781 }
782
783 /**
784 * Activates the given bucket number.
785 *
786 * @param bucket the bucket number to activate
787 */
788 private void activateBucket(int bucket) {
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800789 Queue<Runnable> tasks = flowTasks.remove(bucket);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700790 if (tasks != null) {
791 log.debug("Completing enqueued operations for device {}", deviceId);
792 tasks.forEach(task -> task.run());
793 }
794 }
795
796 /**
797 * Handles a lifecycle event.
798 */
799 private void onLifecycleEvent(LifecycleEvent event) {
800 log.debug("Received lifecycle event for device {}: {}", deviceId, event);
801 switch (event.type()) {
802 case TERM_START:
803 startTerm(event.subject());
804 break;
805 case TERM_ACTIVE:
806 activateTerm(event.subject());
807 break;
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700808 case TERM_UPDATE:
809 updateTerm(event.subject());
810 break;
Jordan Halterman281dbf32018-06-15 17:46:28 -0700811 default:
812 break;
813 }
814 }
815
816 /**
817 * Handles a replica change at the start of a new term.
818 */
819 private void startTerm(DeviceReplicaInfo replicaInfo) {
820 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
821 this.replicaInfo = replicaInfo;
822 if (replicaInfo.isMaster(localNodeId)) {
823 log.info("Synchronizing device {} flows for term {}", deviceId, replicaInfo.term());
824 syncFlows(oldReplicaInfo, replicaInfo);
825 }
826 }
827
828 /**
829 * Handles the activation of a term.
830 */
831 private void activateTerm(DeviceReplicaInfo replicaInfo) {
832 if (replicaInfo.term() < this.replicaInfo.term()) {
833 return;
834 }
835 if (replicaInfo.term() > this.replicaInfo.term()) {
836 this.replicaInfo = replicaInfo;
837 }
838
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100839 // If the local node is neither the master or a backup for the device,
840 // and the number of nodes surpasses the guaranteed backup count, clear the flow table.
841 if (!replicaInfo.isMaster(localNodeId) && !replicaInfo.isBackup(localNodeId) &&
842 (clusterService.getNodes().size() > 1 + ECFlowRuleStore.backupCount)) {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700843 flowBuckets.values().forEach(bucket -> bucket.clear());
844 }
845 activeTerm = replicaInfo.term();
846 }
847
848 /**
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700849 * Handles an update to a term.
850 */
851 private void updateTerm(DeviceReplicaInfo replicaInfo) {
Jordan Halterman0677d882019-03-07 15:52:34 -0800852 DeviceReplicaInfo oldReplicaInfo = this.replicaInfo;
853 if (oldReplicaInfo != null && replicaInfo.term() == oldReplicaInfo.term()) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700854 this.replicaInfo = replicaInfo;
855
856 // If the local node is neither the master or a backup for the device *and the term is active*,
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100857 // and the number of nodes surpasses the guaranteed backup count, clear the flow table.
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700858 if (activeTerm == replicaInfo.term()
859 && !replicaInfo.isMaster(localNodeId)
Andrea Campanella5daa7c462020-03-13 12:04:23 +0100860 && !replicaInfo.isBackup(localNodeId)
861 && (clusterService.getNodes().size() > 1 + ECFlowRuleStore.backupCount)) {
Jordan Haltermane4bf8562018-06-22 16:58:08 -0700862 flowBuckets.values().forEach(bucket -> bucket.clear());
863 }
864 }
865 }
866
867 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700868 * Sends a message to the given node wrapped in a Lamport timestamp.
869 * <p>
870 * Messages are sent in a {@link Timestamped} wrapper and are expected to be received in a {@link Timestamped}
871 * wrapper. The internal {@link LogicalClock} is automatically updated on both send and receive.
872 *
873 * @param message the message to send
874 * @param subject the message subject
875 * @param toNodeId the node to which to send the message
876 * @param <M> the message type
877 * @param <R> the response type
878 * @return a future to be completed with the response
879 */
880 private <M, R> CompletableFuture<R> sendWithTimestamp(M message, MessageSubject subject, NodeId toNodeId) {
881 return clusterCommunicator.<Timestamped<M>, Timestamped<R>>sendAndReceive(
882 clock.timestamp(message), subject, SERIALIZER::encode, SERIALIZER::decode, toNodeId)
883 .thenApply(response -> {
884 clock.tick(response.timestamp());
885 return response.value();
886 });
887 }
888
889 /**
890 * Receives messages to the given subject wrapped in Lamport timestamps.
891 * <p>
892 * Messages are expected to be received in a {@link Timestamped} wrapper and are sent back in a {@link Timestamped}
893 * wrapper. The internal {@link LogicalClock} is automatically updated on both receive and send.
894 *
895 * @param subject the subject for which to register the subscriber
896 * @param function the raw message handler
897 * @param <M> the raw message type
898 * @param <R> the raw response type
899 */
900 private <M, R> void receiveWithTimestamp(MessageSubject subject, Function<M, R> function) {
901 clusterCommunicator.<Timestamped<M>, Timestamped<R>>addSubscriber(subject, SERIALIZER::decode, request -> {
902 clock.tick(request.timestamp());
903 return clock.timestamp(function.apply(request.value()));
Jordan Haltermanaeda2752019-02-22 12:31:25 -0800904 }, SERIALIZER::encode, executor);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700905 }
906
907 /**
908 * Registers internal message subscribers.
909 */
910 private void registerSubscribers() {
911 receiveWithTimestamp(getDigestsSubject, v -> getDigests());
912 receiveWithTimestamp(getBucketSubject, this::onGetBucket);
913 receiveWithTimestamp(backupSubject, this::onBackup);
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700914 clusterCommunicator.<BucketId, Set<FlowEntry>>addSubscriber(
915 getFlowsSubject, SERIALIZER::decode, this::getFlowEntries, SERIALIZER::encode);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700916 }
917
918 /**
919 * Unregisters internal message subscribers.
920 */
921 private void unregisterSubscribers() {
922 clusterCommunicator.removeSubscriber(getDigestsSubject);
923 clusterCommunicator.removeSubscriber(getBucketSubject);
924 clusterCommunicator.removeSubscriber(backupSubject);
Jordan Halterman01f3bdd2019-04-17 11:05:16 -0700925 clusterCommunicator.removeSubscriber(getFlowsSubject);
Jordan Halterman281dbf32018-06-15 17:46:28 -0700926 }
927
928 /**
929 * Adds internal event listeners.
930 */
931 private void addListeners() {
932 lifecycleManager.addListener(lifecycleEventListener);
933 }
934
935 /**
936 * Removes internal event listeners.
937 */
938 private void removeListeners() {
939 lifecycleManager.removeListener(lifecycleEventListener);
940 }
941
942 /**
943 * Cancels recurrent scheduled futures.
944 */
945 private synchronized void cancelFutures() {
Jordan Halterman281dbf32018-06-15 17:46:28 -0700946 ScheduledFuture<?> antiEntropyFuture = this.antiEntropyFuture;
947 if (antiEntropyFuture != null) {
948 antiEntropyFuture.cancel(false);
949 }
950 }
951
952 /**
Jordan Haltermanda3b9f02018-10-05 13:28:51 -0700953 * Purges the flow table.
954 */
955 public void purge() {
956 flowTasks.clear();
957 flowBuckets.values().forEach(bucket -> bucket.purge());
958 lastBackupTimes.clear();
959 inFlightUpdates.clear();
960 }
961
962 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700963 * Closes the device flow table.
964 */
965 public void close() {
966 removeListeners();
967 unregisterSubscribers();
968 cancelFutures();
969 lifecycleManager.close();
970 }
971}