blob: cd85a75b9708e2433cfbf6f7f3e662c27d755656 [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import com.google.common.base.Objects;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
22import com.google.common.util.concurrent.Futures;
23
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.KryoNamespace;
33import org.onlab.util.NewConcurrentHashMap;
34import org.onlab.util.Tools;
35import org.onosproject.cfg.ComponentConfigService;
36import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.CoreService;
39import org.onosproject.core.IdGenerator;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.flow.CompletedBatchOperation;
44import org.onosproject.net.flow.DefaultFlowEntry;
45import org.onosproject.net.flow.FlowEntry;
46import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
49import org.onosproject.net.flow.FlowRuleBatchEntry;
50import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51import org.onosproject.net.flow.FlowRuleBatchEvent;
52import org.onosproject.net.flow.FlowRuleBatchOperation;
53import org.onosproject.net.flow.FlowRuleBatchRequest;
54import org.onosproject.net.flow.FlowRuleEvent;
55import org.onosproject.net.flow.FlowRuleEvent.Type;
56import org.onosproject.net.flow.FlowRuleService;
57import org.onosproject.net.flow.FlowRuleStore;
58import org.onosproject.net.flow.FlowRuleStoreDelegate;
59import org.onosproject.net.flow.StoredFlowEntry;
60import org.onosproject.store.AbstractStore;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62import org.onosproject.store.cluster.messaging.ClusterMessage;
63import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
64import org.onosproject.store.flow.ReplicaInfo;
65import org.onosproject.store.flow.ReplicaInfoService;
66import org.onosproject.store.serializers.KryoSerializer;
67import org.onosproject.store.serializers.StoreSerializer;
68import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
69import org.osgi.service.component.ComponentContext;
70import org.slf4j.Logger;
71
72import java.util.Arrays;
73import java.util.Collections;
74import java.util.Dictionary;
75import java.util.HashSet;
76import java.util.List;
77import java.util.Map;
78import java.util.Set;
79import java.util.concurrent.ConcurrentHashMap;
80import java.util.concurrent.ConcurrentMap;
81import java.util.concurrent.ExecutorService;
82import java.util.concurrent.Executors;
83import java.util.concurrent.ScheduledExecutorService;
84import java.util.concurrent.TimeUnit;
85import java.util.concurrent.atomic.AtomicInteger;
86import java.util.stream.Collectors;
87
88import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
89import static com.google.common.base.Strings.isNullOrEmpty;
90import static org.onlab.util.Tools.get;
91import static org.onlab.util.Tools.groupedThreads;
92import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
93import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
94import static org.slf4j.LoggerFactory.getLogger;
95
96/**
97 * Manages inventory of flow rules using a distributed state management protocol.
98 */
99@Component(immediate = true, enabled = true)
100@Service
101public class NewDistributedFlowRuleStore
102 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
103 implements FlowRuleStore {
104
105 private final Logger log = getLogger(getClass());
106
107 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
108 private static final boolean DEFAULT_BACKUP_ENABLED = true;
109 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
110
111 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
112 label = "Number of threads in the message handler pool")
113 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
114
115 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
116 label = "Indicates whether backups are enabled or not")
117 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
118
119 private InternalFlowTable flowTable = new InternalFlowTable();
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 protected ReplicaInfoService replicaInfoManager;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected ClusterCommunicationService clusterCommunicator;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected ClusterService clusterService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected DeviceService deviceService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected CoreService coreService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected ComponentConfigService configService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected MastershipService mastershipService;
141
142 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
143 private ExecutorService messageHandlingExecutor;
144
145 private final ScheduledExecutorService backupSenderExecutor =
146 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
147
148 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
149 @Override
150 protected void setupKryoPool() {
151 serializerPool = KryoNamespace.newBuilder()
152 .register(DistributedStoreSerializers.STORE_COMMON)
153 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
154 .register(FlowRuleEvent.class)
155 .register(FlowRuleEvent.Type.class)
156 .build();
157 }
158 };
159
160 private IdGenerator idGenerator;
161 private NodeId local;
162
163 @Activate
164 public void activate(ComponentContext context) {
165 configService.registerProperties(getClass());
166
167 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
168
169 local = clusterService.getLocalNode().id();
170
171 messageHandlingExecutor = Executors.newFixedThreadPool(
172 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
173
174 registerMessageHandlers(messageHandlingExecutor);
175
176 backupSenderExecutor.scheduleWithFixedDelay(() -> flowTable.backup(), 0, 2000, TimeUnit.MILLISECONDS);
177
178 logConfig("Started");
179 }
180
181 @Deactivate
182 public void deactivate(ComponentContext context) {
183 configService.unregisterProperties(getClass(), false);
184 unregisterMessageHandlers();
185 messageHandlingExecutor.shutdownNow();
186 backupSenderExecutor.shutdownNow();
187 log.info("Stopped");
188 }
189
190 @SuppressWarnings("rawtypes")
191 @Modified
192 public void modified(ComponentContext context) {
193 if (context == null) {
194 backupEnabled = DEFAULT_BACKUP_ENABLED;
195 logConfig("Default config");
196 return;
197 }
198
199 Dictionary properties = context.getProperties();
200 int newPoolSize;
201 boolean newBackupEnabled;
202 try {
203 String s = get(properties, "msgHandlerPoolSize");
204 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
205
206 s = get(properties, "backupEnabled");
207 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
208
209 } catch (NumberFormatException | ClassCastException e) {
210 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
211 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
212 }
213
214 if (newBackupEnabled != backupEnabled) {
215 backupEnabled = newBackupEnabled;
216 }
217 if (newPoolSize != msgHandlerPoolSize) {
218 msgHandlerPoolSize = newPoolSize;
219 ExecutorService oldMsgHandler = messageHandlingExecutor;
220 messageHandlingExecutor = Executors.newFixedThreadPool(
221 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
222
223 // replace previously registered handlers.
224 registerMessageHandlers(messageHandlingExecutor);
225 oldMsgHandler.shutdown();
226 }
227 logConfig("Reconfigured");
228 }
229
230 private void registerMessageHandlers(ExecutorService executor) {
231
232 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
233 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
234 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
235 clusterCommunicator.addSubscriber(
236 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
237 clusterCommunicator.addSubscriber(
238 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
239 clusterCommunicator.addSubscriber(
240 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
241 clusterCommunicator.addSubscriber(
242 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
243 clusterCommunicator.addSubscriber(
244 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, executor);
245 }
246
247 private void unregisterMessageHandlers() {
248 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
249 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
250 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
251 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
252 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
253 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
254 }
255
256 private void logConfig(String prefix) {
257 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
258 prefix, msgHandlerPoolSize, backupEnabled);
259 }
260
261 // This is not a efficient operation on a distributed sharded
262 // flow store. We need to revisit the need for this operation or at least
263 // make it device specific.
264 @Override
265 public int getFlowRuleCount() {
266 AtomicInteger sum = new AtomicInteger(0);
267 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
268 return sum.get();
269 }
270
271 @Override
272 public FlowEntry getFlowEntry(FlowRule rule) {
273
274 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
275 NodeId master = replicaInfo.master().orNull();
276
277 if (master == null) {
278 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
279 return null;
280 }
281
282 if (Objects.equal(local, master)) {
283 return flowTable.getFlowEntry(rule);
284 }
285
286 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
287 master, rule.deviceId());
288
289 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
290 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
291 SERIALIZER::encode,
292 SERIALIZER::decode,
293 master),
294 FLOW_RULE_STORE_TIMEOUT_MILLIS,
295 TimeUnit.MILLISECONDS,
296 null);
297 }
298
299 @Override
300 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
301
302 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
303 NodeId master = replicaInfo.master().orNull();
304
305 if (master == null) {
306 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
307 return Collections.emptyList();
308 }
309
310 if (Objects.equal(local, master)) {
311 return flowTable.getFlowEntries(deviceId);
312 }
313
314 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
315 master, deviceId);
316
317 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
318 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
319 SERIALIZER::encode,
320 SERIALIZER::decode,
321 master),
322 FLOW_RULE_STORE_TIMEOUT_MILLIS,
323 TimeUnit.MILLISECONDS,
324 Collections.emptyList());
325 }
326
327 @Override
328 public void storeFlowRule(FlowRule rule) {
329 storeBatch(new FlowRuleBatchOperation(
330 Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
331 rule.deviceId(), idGenerator.getNewId()));
332 }
333
334 @Override
335 public void storeBatch(FlowRuleBatchOperation operation) {
336 if (operation.getOperations().isEmpty()) {
337 notifyDelegate(FlowRuleBatchEvent.completed(
338 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
339 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
340 return;
341 }
342
343 DeviceId deviceId = operation.deviceId();
344
345 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
346 NodeId master = replicaInfo.master().orNull();
347
348 if (master == null) {
349 log.warn("No master for {} : flows will be marked for removal", deviceId);
350
351 updateStoreInternal(operation);
352
353 notifyDelegate(FlowRuleBatchEvent.completed(
354 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
355 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
356 return;
357 }
358
359 if (Objects.equal(local, master)) {
360 storeBatchInternal(operation);
361 return;
362 }
363
364 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
365 master, deviceId);
366
367 if (!clusterCommunicator.unicast(operation,
368 APPLY_BATCH_FLOWS,
369 SERIALIZER::encode,
370 master)) {
371 log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
372
373 Set<FlowRule> allFailures = operation.getOperations().stream()
374 .map(op -> op.target())
375 .collect(Collectors.toSet());
376
377 notifyDelegate(FlowRuleBatchEvent.completed(
378 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
379 new CompletedBatchOperation(false, allFailures, deviceId)));
380 return;
381 }
382 }
383
384 private void storeBatchInternal(FlowRuleBatchOperation operation) {
385
386 final DeviceId did = operation.deviceId();
387 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
388 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
389 if (currentOps.isEmpty()) {
390 batchOperationComplete(FlowRuleBatchEvent.completed(
391 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
392 new CompletedBatchOperation(true, Collections.emptySet(), did)));
393 return;
394 }
395
396 notifyDelegate(FlowRuleBatchEvent.requested(new
397 FlowRuleBatchRequest(operation.id(),
398 currentOps), operation.deviceId()));
399 }
400
401 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
402 return operation.getOperations().stream().map(
403 op -> {
404 StoredFlowEntry entry;
405 switch (op.operator()) {
406 case ADD:
407 entry = new DefaultFlowEntry(op.target());
408 // always add requested FlowRule
409 // Note: 2 equal FlowEntry may have different treatment
410 flowTable.remove(entry.deviceId(), entry);
411 flowTable.add(entry);
412
413 return op;
414 case REMOVE:
415 entry = flowTable.getFlowEntry(op.target());
416 if (entry != null) {
417 entry.setState(FlowEntryState.PENDING_REMOVE);
418 return op;
419 }
420 break;
421 case MODIFY:
422 //TODO: figure this out at some point
423 break;
424 default:
425 log.warn("Unknown flow operation operator: {}", op.operator());
426 }
427 return null;
428 }
429 ).filter(op -> op != null).collect(Collectors.toSet());
430 }
431
432 @Override
433 public void deleteFlowRule(FlowRule rule) {
434 storeBatch(
435 new FlowRuleBatchOperation(
436 Arrays.asList(
437 new FlowRuleBatchEntry(
438 FlowRuleOperation.REMOVE,
439 rule)), rule.deviceId(), idGenerator.getNewId()));
440 }
441
442 @Override
443 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
444 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
445 if (Objects.equal(local, replicaInfo.master().orNull())) {
446 return addOrUpdateFlowRuleInternal(rule);
447 }
448
449 log.warn("Tried to update FlowRule {} state,"
450 + " while the Node was not the master.", rule);
451 return null;
452 }
453
454 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
455 // check if this new rule is an update to an existing entry
456 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
457 if (stored != null) {
458 stored.setBytes(rule.bytes());
459 stored.setLife(rule.life());
460 stored.setPackets(rule.packets());
461 if (stored.state() == FlowEntryState.PENDING_ADD) {
462 stored.setState(FlowEntryState.ADDED);
463 return new FlowRuleEvent(Type.RULE_ADDED, rule);
464 }
465 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
466 }
467
468 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
469 // TODO: also update backup if the behavior is correct.
470 flowTable.add(rule);
471 return null;
472 }
473
474 @Override
475 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
476 final DeviceId deviceId = rule.deviceId();
477 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
478 NodeId master = replicaInfo.master().orNull();
479
480 if (Objects.equal(local, master)) {
481 // bypass and handle it locally
482 return removeFlowRuleInternal(rule);
483 }
484
485 if (master == null) {
486 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
487 // TODO: revisit if this should be null (="no-op") or Exception
488 return null;
489 }
490
491 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
492 master, deviceId);
493
494 return Futures.get(clusterCommunicator.sendAndReceive(
495 rule,
496 REMOVE_FLOW_ENTRY,
497 SERIALIZER::encode,
498 SERIALIZER::decode,
499 master),
500 FLOW_RULE_STORE_TIMEOUT_MILLIS,
501 TimeUnit.MILLISECONDS,
502 RuntimeException.class);
503 }
504
505 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
506 final DeviceId deviceId = rule.deviceId();
507 // This is where one could mark a rule as removed and still keep it in the store.
508 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
509 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
510 }
511
512 @Override
513 public void batchOperationComplete(FlowRuleBatchEvent event) {
514 //FIXME: need a per device pending response
515 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
516 if (nodeId == null) {
517 notifyDelegate(event);
518 } else {
519 // TODO check unicast return value
520 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
521 //error log: log.warn("Failed to respond to peer for batch operation result");
522 }
523 }
524
525 private final class OnStoreBatch implements ClusterMessageHandler {
526
527 @Override
528 public void handle(final ClusterMessage message) {
529 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
530 log.debug("received batch request {}", operation);
531
532 final DeviceId deviceId = operation.deviceId();
533 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
534 if (!local.equals(replicaInfo.master().orNull())) {
535
536 Set<FlowRule> failures = new HashSet<>(operation.size());
537 for (FlowRuleBatchEntry op : operation.getOperations()) {
538 failures.add(op.target());
539 }
540 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
541 // This node is no longer the master, respond as all failed.
542 // TODO: we might want to wrap response in envelope
543 // to distinguish sw programming failure and hand over
544 // it make sense in the latter case to retry immediately.
545 message.respond(SERIALIZER.encode(allFailed));
546 return;
547 }
548
549 pendingResponses.put(operation.id(), message.sender());
550 storeBatchInternal(operation);
551 }
552 }
553
554 private class InternalFlowTable {
555
556 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
557 flowEntries = new ConcurrentHashMap<>();
558
559 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
560 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
561 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
562
563 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
564 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
565 }
566
567 /**
568 * Returns the flow table for specified device.
569 *
570 * @param deviceId identifier of the device
571 * @return Map representing Flow Table of given device.
572 */
573 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
574 return createIfAbsentUnchecked(flowEntries, deviceId, lazyEmptyFlowTable());
575 }
576
577 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
578 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
579 }
580
581 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
582 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
583 return flowEntries.stream()
584 .filter(entry -> Objects.equal(entry, rule))
585 .findAny()
586 .orElse(null);
587 }
588
589 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
590 Set<FlowEntry> result = Sets.newHashSet();
591 getFlowTable(deviceId).values().forEach(result::addAll);
592 return result;
593 }
594
595 public StoredFlowEntry getFlowEntry(FlowRule rule) {
596 return getFlowEntryInternal(rule);
597 }
598
599 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
600 return getFlowEntriesInternal(deviceId);
601 }
602
603 public void add(FlowEntry rule) {
604 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
605 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
606 }
607
608 public boolean remove(DeviceId deviceId, FlowEntry rule) {
609 try {
610 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
611 } finally {
612 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
613 }
614 }
615
616 private NodeId getBackupNode(DeviceId deviceId) {
617 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
618 // pick the standby which is most likely to become next master
619 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
620 }
621
622 private void backup() {
623 //TODO: Force backup when backups change.
624 try {
625 // determine the set of devices that we need to backup during this run.
626 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
627 .stream()
628 .filter(deviceId -> {
629 Long lastBackupTime = lastBackupTimes.get(deviceId);
630 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
631 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
632 return lastBackupTime == null
633 || !Objects.equal(lastBackupNode, getBackupNode(deviceId))
634 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
635 })
636 .collect(Collectors.toSet());
637
638 // compute a mapping from node to the set of devices whose flow entries it should backup
639 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
640 devicesToBackup.forEach(deviceId -> {
641 NodeId backupLocation = getBackupNode(deviceId);
642 if (backupLocation != null) {
643 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
644 .add(deviceId);
645 }
646 });
647
648 // send the device flow entries to their respective backup nodes
649 devicesToBackupByNode.forEach((nodeId, deviceIds) -> {
650 Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
651 Maps.newConcurrentMap();
652 flowEntries.forEach((key, value) -> {
653 if (deviceIds.contains(key)) {
654 deviceFlowEntries.put(key, value);
655 }
656 });
657 clusterCommunicator.unicast(deviceFlowEntries,
658 FLOW_TABLE_BACKUP,
659 SERIALIZER::encode,
660 nodeId);
661 });
662
663 // update state for use in subsequent run.
664 devicesToBackupByNode.forEach((node, devices) -> {
665 devices.forEach(id -> {
666 lastBackupTimes.put(id, System.currentTimeMillis());
667 lastBackupNodes.put(id, node);
668 });
669 });
670 } catch (Exception e) {
671 log.error("Backup failed.", e);
672 }
673 }
674
675 private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
676 Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
677 Maps.filterKeys(flowTables, managedDevices::contains).forEach((deviceId, flowTable) -> {
678 Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
679 deviceFlowTable.clear();
680 deviceFlowTable.putAll(flowTable);
681 });
682 }
683 }
684}