blob: e042982932d1649349c83e8e680d672ccfc3b608 [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import com.google.common.base.Objects;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
22import com.google.common.util.concurrent.Futures;
23
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.KryoNamespace;
33import org.onlab.util.NewConcurrentHashMap;
34import org.onlab.util.Tools;
35import org.onosproject.cfg.ComponentConfigService;
36import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.CoreService;
39import org.onosproject.core.IdGenerator;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.flow.CompletedBatchOperation;
44import org.onosproject.net.flow.DefaultFlowEntry;
45import org.onosproject.net.flow.FlowEntry;
46import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
49import org.onosproject.net.flow.FlowRuleBatchEntry;
50import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51import org.onosproject.net.flow.FlowRuleBatchEvent;
52import org.onosproject.net.flow.FlowRuleBatchOperation;
53import org.onosproject.net.flow.FlowRuleBatchRequest;
54import org.onosproject.net.flow.FlowRuleEvent;
55import org.onosproject.net.flow.FlowRuleEvent.Type;
56import org.onosproject.net.flow.FlowRuleService;
57import org.onosproject.net.flow.FlowRuleStore;
58import org.onosproject.net.flow.FlowRuleStoreDelegate;
59import org.onosproject.net.flow.StoredFlowEntry;
60import org.onosproject.store.AbstractStore;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62import org.onosproject.store.cluster.messaging.ClusterMessage;
63import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
64import org.onosproject.store.flow.ReplicaInfo;
65import org.onosproject.store.flow.ReplicaInfoService;
66import org.onosproject.store.serializers.KryoSerializer;
67import org.onosproject.store.serializers.StoreSerializer;
68import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
69import org.osgi.service.component.ComponentContext;
70import org.slf4j.Logger;
71
72import java.util.Arrays;
73import java.util.Collections;
74import java.util.Dictionary;
75import java.util.HashSet;
76import java.util.List;
77import java.util.Map;
78import java.util.Set;
79import java.util.concurrent.ConcurrentHashMap;
80import java.util.concurrent.ConcurrentMap;
81import java.util.concurrent.ExecutorService;
82import java.util.concurrent.Executors;
83import java.util.concurrent.ScheduledExecutorService;
Madan Jampani08bf17b2015-05-06 16:25:26 -070084import java.util.concurrent.ScheduledFuture;
Madan Jampani86940d92015-05-06 11:47:57 -070085import java.util.concurrent.TimeUnit;
86import java.util.concurrent.atomic.AtomicInteger;
87import java.util.stream.Collectors;
88
89import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
90import static com.google.common.base.Strings.isNullOrEmpty;
91import static org.onlab.util.Tools.get;
92import static org.onlab.util.Tools.groupedThreads;
93import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
94import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
95import static org.slf4j.LoggerFactory.getLogger;
96
97/**
98 * Manages inventory of flow rules using a distributed state management protocol.
99 */
100@Component(immediate = true, enabled = true)
101@Service
102public class NewDistributedFlowRuleStore
103 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
104 implements FlowRuleStore {
105
106 private final Logger log = getLogger(getClass());
107
108 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
109 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700110 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Madan Jampani86940d92015-05-06 11:47:57 -0700111 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
112
113 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
114 label = "Number of threads in the message handler pool")
115 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
116
117 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
118 label = "Indicates whether backups are enabled or not")
119 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
120
Madan Jampani08bf17b2015-05-06 16:25:26 -0700121 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
122 label = "Delay in ms between successive backup runs")
123 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
124
Madan Jampani86940d92015-05-06 11:47:57 -0700125 private InternalFlowTable flowTable = new InternalFlowTable();
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected ReplicaInfoService replicaInfoManager;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ClusterCommunicationService clusterCommunicator;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected ClusterService clusterService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected DeviceService deviceService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected CoreService coreService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected ComponentConfigService configService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
146 protected MastershipService mastershipService;
147
148 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
149 private ExecutorService messageHandlingExecutor;
150
Madan Jampani08bf17b2015-05-06 16:25:26 -0700151 private ScheduledFuture<?> backupTask;
Madan Jampani86940d92015-05-06 11:47:57 -0700152 private final ScheduledExecutorService backupSenderExecutor =
153 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
154
155 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
156 @Override
157 protected void setupKryoPool() {
158 serializerPool = KryoNamespace.newBuilder()
159 .register(DistributedStoreSerializers.STORE_COMMON)
160 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
161 .register(FlowRuleEvent.class)
162 .register(FlowRuleEvent.Type.class)
163 .build();
164 }
165 };
166
167 private IdGenerator idGenerator;
168 private NodeId local;
169
170 @Activate
171 public void activate(ComponentContext context) {
172 configService.registerProperties(getClass());
173
174 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
175
176 local = clusterService.getLocalNode().id();
177
178 messageHandlingExecutor = Executors.newFixedThreadPool(
179 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
180
181 registerMessageHandlers(messageHandlingExecutor);
182
Madan Jampani08bf17b2015-05-06 16:25:26 -0700183 if (backupEnabled) {
184 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
185 flowTable::backup,
186 0,
187 backupPeriod,
188 TimeUnit.MILLISECONDS);
189 }
Madan Jampani86940d92015-05-06 11:47:57 -0700190
191 logConfig("Started");
192 }
193
194 @Deactivate
195 public void deactivate(ComponentContext context) {
196 configService.unregisterProperties(getClass(), false);
197 unregisterMessageHandlers();
198 messageHandlingExecutor.shutdownNow();
199 backupSenderExecutor.shutdownNow();
200 log.info("Stopped");
201 }
202
203 @SuppressWarnings("rawtypes")
204 @Modified
205 public void modified(ComponentContext context) {
206 if (context == null) {
207 backupEnabled = DEFAULT_BACKUP_ENABLED;
208 logConfig("Default config");
209 return;
210 }
211
212 Dictionary properties = context.getProperties();
213 int newPoolSize;
214 boolean newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700215 int newBackupPeriod;
Madan Jampani86940d92015-05-06 11:47:57 -0700216 try {
217 String s = get(properties, "msgHandlerPoolSize");
218 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
219
220 s = get(properties, "backupEnabled");
221 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
222
Madan Jampani08bf17b2015-05-06 16:25:26 -0700223 s = get(properties, "backupPeriod");
224 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
225
Madan Jampani86940d92015-05-06 11:47:57 -0700226 } catch (NumberFormatException | ClassCastException e) {
227 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
228 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700229 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Madan Jampani86940d92015-05-06 11:47:57 -0700230 }
231
Madan Jampani08bf17b2015-05-06 16:25:26 -0700232 boolean restartBackupTask = false;
Madan Jampani86940d92015-05-06 11:47:57 -0700233 if (newBackupEnabled != backupEnabled) {
234 backupEnabled = newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700235 if (!backupEnabled && backupTask != null) {
236 backupTask.cancel(false);
237 backupTask = null;
238 }
239 restartBackupTask = backupEnabled;
240 }
241 if (newBackupPeriod != backupPeriod) {
242 backupPeriod = newBackupPeriod;
243 restartBackupTask = backupEnabled;
244 }
245 if (restartBackupTask) {
246 if (backupTask != null) {
247 // cancel previously running task
248 backupTask.cancel(false);
249 }
250 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
251 flowTable::backup,
252 0,
253 backupPeriod,
254 TimeUnit.MILLISECONDS);
Madan Jampani86940d92015-05-06 11:47:57 -0700255 }
256 if (newPoolSize != msgHandlerPoolSize) {
257 msgHandlerPoolSize = newPoolSize;
258 ExecutorService oldMsgHandler = messageHandlingExecutor;
259 messageHandlingExecutor = Executors.newFixedThreadPool(
260 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
261
262 // replace previously registered handlers.
263 registerMessageHandlers(messageHandlingExecutor);
264 oldMsgHandler.shutdown();
265 }
266 logConfig("Reconfigured");
267 }
268
269 private void registerMessageHandlers(ExecutorService executor) {
270
271 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
272 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
273 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
274 clusterCommunicator.addSubscriber(
275 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
276 clusterCommunicator.addSubscriber(
277 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
278 clusterCommunicator.addSubscriber(
279 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
280 clusterCommunicator.addSubscriber(
281 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
282 clusterCommunicator.addSubscriber(
283 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, executor);
284 }
285
286 private void unregisterMessageHandlers() {
287 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
288 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
289 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
290 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
291 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
292 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
293 }
294
295 private void logConfig(String prefix) {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700296 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
297 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
Madan Jampani86940d92015-05-06 11:47:57 -0700298 }
299
300 // This is not a efficient operation on a distributed sharded
301 // flow store. We need to revisit the need for this operation or at least
302 // make it device specific.
303 @Override
304 public int getFlowRuleCount() {
305 AtomicInteger sum = new AtomicInteger(0);
306 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
307 return sum.get();
308 }
309
310 @Override
311 public FlowEntry getFlowEntry(FlowRule rule) {
312
313 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
314 NodeId master = replicaInfo.master().orNull();
315
316 if (master == null) {
317 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
318 return null;
319 }
320
321 if (Objects.equal(local, master)) {
322 return flowTable.getFlowEntry(rule);
323 }
324
325 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
326 master, rule.deviceId());
327
328 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
329 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
330 SERIALIZER::encode,
331 SERIALIZER::decode,
332 master),
333 FLOW_RULE_STORE_TIMEOUT_MILLIS,
334 TimeUnit.MILLISECONDS,
335 null);
336 }
337
338 @Override
339 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
340
341 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
342 NodeId master = replicaInfo.master().orNull();
343
344 if (master == null) {
345 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
346 return Collections.emptyList();
347 }
348
349 if (Objects.equal(local, master)) {
350 return flowTable.getFlowEntries(deviceId);
351 }
352
353 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
354 master, deviceId);
355
356 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
357 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
358 SERIALIZER::encode,
359 SERIALIZER::decode,
360 master),
361 FLOW_RULE_STORE_TIMEOUT_MILLIS,
362 TimeUnit.MILLISECONDS,
363 Collections.emptyList());
364 }
365
366 @Override
367 public void storeFlowRule(FlowRule rule) {
368 storeBatch(new FlowRuleBatchOperation(
369 Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
370 rule.deviceId(), idGenerator.getNewId()));
371 }
372
373 @Override
374 public void storeBatch(FlowRuleBatchOperation operation) {
375 if (operation.getOperations().isEmpty()) {
376 notifyDelegate(FlowRuleBatchEvent.completed(
377 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
378 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
379 return;
380 }
381
382 DeviceId deviceId = operation.deviceId();
383
384 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
385 NodeId master = replicaInfo.master().orNull();
386
387 if (master == null) {
388 log.warn("No master for {} : flows will be marked for removal", deviceId);
389
390 updateStoreInternal(operation);
391
392 notifyDelegate(FlowRuleBatchEvent.completed(
393 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
394 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
395 return;
396 }
397
398 if (Objects.equal(local, master)) {
399 storeBatchInternal(operation);
400 return;
401 }
402
403 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
404 master, deviceId);
405
406 if (!clusterCommunicator.unicast(operation,
407 APPLY_BATCH_FLOWS,
408 SERIALIZER::encode,
409 master)) {
410 log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
411
412 Set<FlowRule> allFailures = operation.getOperations().stream()
413 .map(op -> op.target())
414 .collect(Collectors.toSet());
415
416 notifyDelegate(FlowRuleBatchEvent.completed(
417 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
418 new CompletedBatchOperation(false, allFailures, deviceId)));
419 return;
420 }
421 }
422
423 private void storeBatchInternal(FlowRuleBatchOperation operation) {
424
425 final DeviceId did = operation.deviceId();
426 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
427 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
428 if (currentOps.isEmpty()) {
429 batchOperationComplete(FlowRuleBatchEvent.completed(
430 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
431 new CompletedBatchOperation(true, Collections.emptySet(), did)));
432 return;
433 }
434
435 notifyDelegate(FlowRuleBatchEvent.requested(new
436 FlowRuleBatchRequest(operation.id(),
437 currentOps), operation.deviceId()));
438 }
439
440 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
441 return operation.getOperations().stream().map(
442 op -> {
443 StoredFlowEntry entry;
444 switch (op.operator()) {
445 case ADD:
446 entry = new DefaultFlowEntry(op.target());
447 // always add requested FlowRule
448 // Note: 2 equal FlowEntry may have different treatment
449 flowTable.remove(entry.deviceId(), entry);
450 flowTable.add(entry);
451
452 return op;
453 case REMOVE:
454 entry = flowTable.getFlowEntry(op.target());
455 if (entry != null) {
456 entry.setState(FlowEntryState.PENDING_REMOVE);
457 return op;
458 }
459 break;
460 case MODIFY:
461 //TODO: figure this out at some point
462 break;
463 default:
464 log.warn("Unknown flow operation operator: {}", op.operator());
465 }
466 return null;
467 }
468 ).filter(op -> op != null).collect(Collectors.toSet());
469 }
470
471 @Override
472 public void deleteFlowRule(FlowRule rule) {
473 storeBatch(
474 new FlowRuleBatchOperation(
475 Arrays.asList(
476 new FlowRuleBatchEntry(
477 FlowRuleOperation.REMOVE,
478 rule)), rule.deviceId(), idGenerator.getNewId()));
479 }
480
481 @Override
482 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
483 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
484 if (Objects.equal(local, replicaInfo.master().orNull())) {
485 return addOrUpdateFlowRuleInternal(rule);
486 }
487
488 log.warn("Tried to update FlowRule {} state,"
489 + " while the Node was not the master.", rule);
490 return null;
491 }
492
493 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
494 // check if this new rule is an update to an existing entry
495 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
496 if (stored != null) {
497 stored.setBytes(rule.bytes());
498 stored.setLife(rule.life());
499 stored.setPackets(rule.packets());
500 if (stored.state() == FlowEntryState.PENDING_ADD) {
501 stored.setState(FlowEntryState.ADDED);
502 return new FlowRuleEvent(Type.RULE_ADDED, rule);
503 }
504 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
505 }
506
507 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
508 // TODO: also update backup if the behavior is correct.
509 flowTable.add(rule);
510 return null;
511 }
512
513 @Override
514 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
515 final DeviceId deviceId = rule.deviceId();
516 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
517 NodeId master = replicaInfo.master().orNull();
518
519 if (Objects.equal(local, master)) {
520 // bypass and handle it locally
521 return removeFlowRuleInternal(rule);
522 }
523
524 if (master == null) {
525 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
526 // TODO: revisit if this should be null (="no-op") or Exception
527 return null;
528 }
529
530 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
531 master, deviceId);
532
533 return Futures.get(clusterCommunicator.sendAndReceive(
534 rule,
535 REMOVE_FLOW_ENTRY,
536 SERIALIZER::encode,
537 SERIALIZER::decode,
538 master),
539 FLOW_RULE_STORE_TIMEOUT_MILLIS,
540 TimeUnit.MILLISECONDS,
541 RuntimeException.class);
542 }
543
544 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
545 final DeviceId deviceId = rule.deviceId();
546 // This is where one could mark a rule as removed and still keep it in the store.
547 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
548 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
549 }
550
551 @Override
552 public void batchOperationComplete(FlowRuleBatchEvent event) {
553 //FIXME: need a per device pending response
554 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
555 if (nodeId == null) {
556 notifyDelegate(event);
557 } else {
558 // TODO check unicast return value
559 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
560 //error log: log.warn("Failed to respond to peer for batch operation result");
561 }
562 }
563
564 private final class OnStoreBatch implements ClusterMessageHandler {
565
566 @Override
567 public void handle(final ClusterMessage message) {
568 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
569 log.debug("received batch request {}", operation);
570
571 final DeviceId deviceId = operation.deviceId();
572 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
573 if (!local.equals(replicaInfo.master().orNull())) {
574
575 Set<FlowRule> failures = new HashSet<>(operation.size());
576 for (FlowRuleBatchEntry op : operation.getOperations()) {
577 failures.add(op.target());
578 }
579 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
580 // This node is no longer the master, respond as all failed.
581 // TODO: we might want to wrap response in envelope
582 // to distinguish sw programming failure and hand over
583 // it make sense in the latter case to retry immediately.
584 message.respond(SERIALIZER.encode(allFailed));
585 return;
586 }
587
588 pendingResponses.put(operation.id(), message.sender());
589 storeBatchInternal(operation);
590 }
591 }
592
593 private class InternalFlowTable {
594
595 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
596 flowEntries = new ConcurrentHashMap<>();
597
598 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
599 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
600 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
601
602 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
603 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
604 }
605
606 /**
607 * Returns the flow table for specified device.
608 *
609 * @param deviceId identifier of the device
610 * @return Map representing Flow Table of given device.
611 */
612 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
613 return createIfAbsentUnchecked(flowEntries, deviceId, lazyEmptyFlowTable());
614 }
615
616 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
617 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
618 }
619
620 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
621 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
622 return flowEntries.stream()
623 .filter(entry -> Objects.equal(entry, rule))
624 .findAny()
625 .orElse(null);
626 }
627
628 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
629 Set<FlowEntry> result = Sets.newHashSet();
630 getFlowTable(deviceId).values().forEach(result::addAll);
631 return result;
632 }
633
634 public StoredFlowEntry getFlowEntry(FlowRule rule) {
635 return getFlowEntryInternal(rule);
636 }
637
638 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
639 return getFlowEntriesInternal(deviceId);
640 }
641
642 public void add(FlowEntry rule) {
643 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
644 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
645 }
646
647 public boolean remove(DeviceId deviceId, FlowEntry rule) {
648 try {
649 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
650 } finally {
651 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
652 }
653 }
654
655 private NodeId getBackupNode(DeviceId deviceId) {
656 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
657 // pick the standby which is most likely to become next master
658 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
659 }
660
661 private void backup() {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700662 if (!backupEnabled) {
663 return;
664 }
Madan Jampani86940d92015-05-06 11:47:57 -0700665 //TODO: Force backup when backups change.
666 try {
667 // determine the set of devices that we need to backup during this run.
668 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
669 .stream()
670 .filter(deviceId -> {
671 Long lastBackupTime = lastBackupTimes.get(deviceId);
672 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
673 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
674 return lastBackupTime == null
675 || !Objects.equal(lastBackupNode, getBackupNode(deviceId))
676 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
677 })
678 .collect(Collectors.toSet());
679
680 // compute a mapping from node to the set of devices whose flow entries it should backup
681 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
682 devicesToBackup.forEach(deviceId -> {
683 NodeId backupLocation = getBackupNode(deviceId);
684 if (backupLocation != null) {
685 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
686 .add(deviceId);
687 }
688 });
689
690 // send the device flow entries to their respective backup nodes
691 devicesToBackupByNode.forEach((nodeId, deviceIds) -> {
692 Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
693 Maps.newConcurrentMap();
694 flowEntries.forEach((key, value) -> {
695 if (deviceIds.contains(key)) {
696 deviceFlowEntries.put(key, value);
697 }
698 });
699 clusterCommunicator.unicast(deviceFlowEntries,
700 FLOW_TABLE_BACKUP,
701 SERIALIZER::encode,
702 nodeId);
703 });
704
705 // update state for use in subsequent run.
706 devicesToBackupByNode.forEach((node, devices) -> {
707 devices.forEach(id -> {
708 lastBackupTimes.put(id, System.currentTimeMillis());
709 lastBackupNodes.put(id, node);
710 });
711 });
712 } catch (Exception e) {
713 log.error("Backup failed.", e);
714 }
715 }
716
717 private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
718 Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700719 // Only process those devices are that not managed by the local node.
720 Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
721 .forEach((deviceId, flowTable) -> {
722 Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
723 deviceFlowTable.clear();
724 deviceFlowTable.putAll(flowTable);
725 });
Madan Jampani86940d92015-05-06 11:47:57 -0700726 }
727 }
728}