blob: 56c4dfb84c1b4706aaaa0a8b7bc3e6e3526c9a02 [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import com.google.common.base.Objects;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
22import com.google.common.util.concurrent.Futures;
23
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.KryoNamespace;
33import org.onlab.util.NewConcurrentHashMap;
34import org.onlab.util.Tools;
35import org.onosproject.cfg.ComponentConfigService;
36import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.CoreService;
39import org.onosproject.core.IdGenerator;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.flow.CompletedBatchOperation;
44import org.onosproject.net.flow.DefaultFlowEntry;
45import org.onosproject.net.flow.FlowEntry;
46import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
49import org.onosproject.net.flow.FlowRuleBatchEntry;
50import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51import org.onosproject.net.flow.FlowRuleBatchEvent;
52import org.onosproject.net.flow.FlowRuleBatchOperation;
53import org.onosproject.net.flow.FlowRuleBatchRequest;
54import org.onosproject.net.flow.FlowRuleEvent;
55import org.onosproject.net.flow.FlowRuleEvent.Type;
56import org.onosproject.net.flow.FlowRuleService;
57import org.onosproject.net.flow.FlowRuleStore;
58import org.onosproject.net.flow.FlowRuleStoreDelegate;
59import org.onosproject.net.flow.StoredFlowEntry;
60import org.onosproject.store.AbstractStore;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62import org.onosproject.store.cluster.messaging.ClusterMessage;
63import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
64import org.onosproject.store.flow.ReplicaInfo;
Madan Jampanif7536ab2015-05-07 23:23:23 -070065import org.onosproject.store.flow.ReplicaInfoEvent;
66import org.onosproject.store.flow.ReplicaInfoEventListener;
Madan Jampani86940d92015-05-06 11:47:57 -070067import org.onosproject.store.flow.ReplicaInfoService;
68import org.onosproject.store.serializers.KryoSerializer;
69import org.onosproject.store.serializers.StoreSerializer;
70import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
71import org.osgi.service.component.ComponentContext;
72import org.slf4j.Logger;
73
Madan Jampani86940d92015-05-06 11:47:57 -070074import java.util.Collections;
75import java.util.Dictionary;
76import java.util.HashSet;
77import java.util.List;
78import java.util.Map;
79import java.util.Set;
80import java.util.concurrent.ConcurrentHashMap;
81import java.util.concurrent.ConcurrentMap;
82import java.util.concurrent.ExecutorService;
83import java.util.concurrent.Executors;
84import java.util.concurrent.ScheduledExecutorService;
Madan Jampani08bf17b2015-05-06 16:25:26 -070085import java.util.concurrent.ScheduledFuture;
Madan Jampani86940d92015-05-06 11:47:57 -070086import java.util.concurrent.TimeUnit;
87import java.util.concurrent.atomic.AtomicInteger;
88import java.util.stream.Collectors;
89
90import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
91import static com.google.common.base.Strings.isNullOrEmpty;
92import static org.onlab.util.Tools.get;
93import static org.onlab.util.Tools.groupedThreads;
94import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
95import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
96import static org.slf4j.LoggerFactory.getLogger;
97
98/**
99 * Manages inventory of flow rules using a distributed state management protocol.
100 */
101@Component(immediate = true, enabled = true)
102@Service
103public class NewDistributedFlowRuleStore
104 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
105 implements FlowRuleStore {
106
107 private final Logger log = getLogger(getClass());
108
109 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
110 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700111 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Madan Jampani86940d92015-05-06 11:47:57 -0700112 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
113
114 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
115 label = "Number of threads in the message handler pool")
116 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
117
118 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
119 label = "Indicates whether backups are enabled or not")
120 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
121
Madan Jampani08bf17b2015-05-06 16:25:26 -0700122 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
123 label = "Delay in ms between successive backup runs")
124 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
125
Madan Jampani86940d92015-05-06 11:47:57 -0700126 private InternalFlowTable flowTable = new InternalFlowTable();
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected ReplicaInfoService replicaInfoManager;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected ClusterCommunicationService clusterCommunicator;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 protected ClusterService clusterService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 protected DeviceService deviceService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 protected CoreService coreService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 protected ComponentConfigService configService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
147 protected MastershipService mastershipService;
148
149 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
150 private ExecutorService messageHandlingExecutor;
151
Madan Jampani08bf17b2015-05-06 16:25:26 -0700152 private ScheduledFuture<?> backupTask;
Madan Jampani86940d92015-05-06 11:47:57 -0700153 private final ScheduledExecutorService backupSenderExecutor =
154 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
155
156 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
157 @Override
158 protected void setupKryoPool() {
159 serializerPool = KryoNamespace.newBuilder()
160 .register(DistributedStoreSerializers.STORE_COMMON)
161 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
162 .register(FlowRuleEvent.class)
163 .register(FlowRuleEvent.Type.class)
164 .build();
165 }
166 };
167
168 private IdGenerator idGenerator;
169 private NodeId local;
170
171 @Activate
172 public void activate(ComponentContext context) {
173 configService.registerProperties(getClass());
174
175 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
176
177 local = clusterService.getLocalNode().id();
178
179 messageHandlingExecutor = Executors.newFixedThreadPool(
180 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
181
182 registerMessageHandlers(messageHandlingExecutor);
183
Madan Jampani08bf17b2015-05-06 16:25:26 -0700184 if (backupEnabled) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700185 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700186 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
187 flowTable::backup,
188 0,
189 backupPeriod,
190 TimeUnit.MILLISECONDS);
191 }
Madan Jampani86940d92015-05-06 11:47:57 -0700192
193 logConfig("Started");
194 }
195
196 @Deactivate
197 public void deactivate(ComponentContext context) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700198 if (backupEnabled) {
199 replicaInfoManager.removeListener(flowTable);
200 backupTask.cancel(true);
201 }
Madan Jampani86940d92015-05-06 11:47:57 -0700202 configService.unregisterProperties(getClass(), false);
203 unregisterMessageHandlers();
204 messageHandlingExecutor.shutdownNow();
205 backupSenderExecutor.shutdownNow();
206 log.info("Stopped");
207 }
208
209 @SuppressWarnings("rawtypes")
210 @Modified
211 public void modified(ComponentContext context) {
212 if (context == null) {
213 backupEnabled = DEFAULT_BACKUP_ENABLED;
214 logConfig("Default config");
215 return;
216 }
217
218 Dictionary properties = context.getProperties();
219 int newPoolSize;
220 boolean newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700221 int newBackupPeriod;
Madan Jampani86940d92015-05-06 11:47:57 -0700222 try {
223 String s = get(properties, "msgHandlerPoolSize");
224 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
225
226 s = get(properties, "backupEnabled");
227 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
228
Madan Jampani08bf17b2015-05-06 16:25:26 -0700229 s = get(properties, "backupPeriod");
230 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
231
Madan Jampani86940d92015-05-06 11:47:57 -0700232 } catch (NumberFormatException | ClassCastException e) {
233 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
234 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700235 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Madan Jampani86940d92015-05-06 11:47:57 -0700236 }
237
Madan Jampani08bf17b2015-05-06 16:25:26 -0700238 boolean restartBackupTask = false;
Madan Jampani86940d92015-05-06 11:47:57 -0700239 if (newBackupEnabled != backupEnabled) {
240 backupEnabled = newBackupEnabled;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700241 if (!backupEnabled) {
242 replicaInfoManager.removeListener(flowTable);
243 if (backupTask != null) {
244 backupTask.cancel(false);
245 backupTask = null;
246 }
247 } else {
248 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700249 }
250 restartBackupTask = backupEnabled;
251 }
252 if (newBackupPeriod != backupPeriod) {
253 backupPeriod = newBackupPeriod;
254 restartBackupTask = backupEnabled;
255 }
256 if (restartBackupTask) {
257 if (backupTask != null) {
258 // cancel previously running task
259 backupTask.cancel(false);
260 }
261 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
262 flowTable::backup,
263 0,
264 backupPeriod,
265 TimeUnit.MILLISECONDS);
Madan Jampani86940d92015-05-06 11:47:57 -0700266 }
267 if (newPoolSize != msgHandlerPoolSize) {
268 msgHandlerPoolSize = newPoolSize;
269 ExecutorService oldMsgHandler = messageHandlingExecutor;
270 messageHandlingExecutor = Executors.newFixedThreadPool(
271 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
272
273 // replace previously registered handlers.
274 registerMessageHandlers(messageHandlingExecutor);
275 oldMsgHandler.shutdown();
276 }
277 logConfig("Reconfigured");
278 }
279
280 private void registerMessageHandlers(ExecutorService executor) {
281
282 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
283 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
284 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
285 clusterCommunicator.addSubscriber(
286 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
287 clusterCommunicator.addSubscriber(
288 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
289 clusterCommunicator.addSubscriber(
290 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
291 clusterCommunicator.addSubscriber(
292 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
293 clusterCommunicator.addSubscriber(
294 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, executor);
295 }
296
297 private void unregisterMessageHandlers() {
298 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
299 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
300 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
301 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
302 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
303 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
304 }
305
306 private void logConfig(String prefix) {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700307 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
308 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
Madan Jampani86940d92015-05-06 11:47:57 -0700309 }
310
311 // This is not a efficient operation on a distributed sharded
312 // flow store. We need to revisit the need for this operation or at least
313 // make it device specific.
314 @Override
315 public int getFlowRuleCount() {
316 AtomicInteger sum = new AtomicInteger(0);
317 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
318 return sum.get();
319 }
320
321 @Override
322 public FlowEntry getFlowEntry(FlowRule rule) {
323
324 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
325 NodeId master = replicaInfo.master().orNull();
326
327 if (master == null) {
328 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
329 return null;
330 }
331
332 if (Objects.equal(local, master)) {
333 return flowTable.getFlowEntry(rule);
334 }
335
336 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
337 master, rule.deviceId());
338
339 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
340 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
341 SERIALIZER::encode,
342 SERIALIZER::decode,
343 master),
344 FLOW_RULE_STORE_TIMEOUT_MILLIS,
345 TimeUnit.MILLISECONDS,
346 null);
347 }
348
349 @Override
350 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
351
352 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
353 NodeId master = replicaInfo.master().orNull();
354
355 if (master == null) {
356 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
357 return Collections.emptyList();
358 }
359
360 if (Objects.equal(local, master)) {
361 return flowTable.getFlowEntries(deviceId);
362 }
363
364 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
365 master, deviceId);
366
367 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
368 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
369 SERIALIZER::encode,
370 SERIALIZER::decode,
371 master),
372 FLOW_RULE_STORE_TIMEOUT_MILLIS,
373 TimeUnit.MILLISECONDS,
374 Collections.emptyList());
375 }
376
377 @Override
378 public void storeFlowRule(FlowRule rule) {
379 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700380 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700381 rule.deviceId(), idGenerator.getNewId()));
382 }
383
384 @Override
385 public void storeBatch(FlowRuleBatchOperation operation) {
386 if (operation.getOperations().isEmpty()) {
387 notifyDelegate(FlowRuleBatchEvent.completed(
388 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
389 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
390 return;
391 }
392
393 DeviceId deviceId = operation.deviceId();
394
395 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
396 NodeId master = replicaInfo.master().orNull();
397
398 if (master == null) {
399 log.warn("No master for {} : flows will be marked for removal", deviceId);
400
401 updateStoreInternal(operation);
402
403 notifyDelegate(FlowRuleBatchEvent.completed(
404 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
405 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
406 return;
407 }
408
409 if (Objects.equal(local, master)) {
410 storeBatchInternal(operation);
411 return;
412 }
413
414 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
415 master, deviceId);
416
417 if (!clusterCommunicator.unicast(operation,
418 APPLY_BATCH_FLOWS,
419 SERIALIZER::encode,
420 master)) {
421 log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
422
423 Set<FlowRule> allFailures = operation.getOperations().stream()
424 .map(op -> op.target())
425 .collect(Collectors.toSet());
426
427 notifyDelegate(FlowRuleBatchEvent.completed(
428 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
429 new CompletedBatchOperation(false, allFailures, deviceId)));
430 return;
431 }
432 }
433
434 private void storeBatchInternal(FlowRuleBatchOperation operation) {
435
436 final DeviceId did = operation.deviceId();
437 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
438 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
439 if (currentOps.isEmpty()) {
440 batchOperationComplete(FlowRuleBatchEvent.completed(
441 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
442 new CompletedBatchOperation(true, Collections.emptySet(), did)));
443 return;
444 }
445
446 notifyDelegate(FlowRuleBatchEvent.requested(new
447 FlowRuleBatchRequest(operation.id(),
448 currentOps), operation.deviceId()));
449 }
450
451 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
452 return operation.getOperations().stream().map(
453 op -> {
454 StoredFlowEntry entry;
455 switch (op.operator()) {
456 case ADD:
457 entry = new DefaultFlowEntry(op.target());
458 // always add requested FlowRule
459 // Note: 2 equal FlowEntry may have different treatment
460 flowTable.remove(entry.deviceId(), entry);
461 flowTable.add(entry);
462
463 return op;
464 case REMOVE:
465 entry = flowTable.getFlowEntry(op.target());
466 if (entry != null) {
467 entry.setState(FlowEntryState.PENDING_REMOVE);
468 return op;
469 }
470 break;
471 case MODIFY:
472 //TODO: figure this out at some point
473 break;
474 default:
475 log.warn("Unknown flow operation operator: {}", op.operator());
476 }
477 return null;
478 }
479 ).filter(op -> op != null).collect(Collectors.toSet());
480 }
481
482 @Override
483 public void deleteFlowRule(FlowRule rule) {
484 storeBatch(
485 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700486 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700487 new FlowRuleBatchEntry(
488 FlowRuleOperation.REMOVE,
489 rule)), rule.deviceId(), idGenerator.getNewId()));
490 }
491
492 @Override
493 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
494 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
495 if (Objects.equal(local, replicaInfo.master().orNull())) {
496 return addOrUpdateFlowRuleInternal(rule);
497 }
498
499 log.warn("Tried to update FlowRule {} state,"
500 + " while the Node was not the master.", rule);
501 return null;
502 }
503
504 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
505 // check if this new rule is an update to an existing entry
506 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
507 if (stored != null) {
508 stored.setBytes(rule.bytes());
509 stored.setLife(rule.life());
510 stored.setPackets(rule.packets());
511 if (stored.state() == FlowEntryState.PENDING_ADD) {
512 stored.setState(FlowEntryState.ADDED);
513 return new FlowRuleEvent(Type.RULE_ADDED, rule);
514 }
515 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
516 }
517
518 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
519 // TODO: also update backup if the behavior is correct.
520 flowTable.add(rule);
521 return null;
522 }
523
524 @Override
525 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
526 final DeviceId deviceId = rule.deviceId();
527 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
528 NodeId master = replicaInfo.master().orNull();
529
530 if (Objects.equal(local, master)) {
531 // bypass and handle it locally
532 return removeFlowRuleInternal(rule);
533 }
534
535 if (master == null) {
536 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
537 // TODO: revisit if this should be null (="no-op") or Exception
538 return null;
539 }
540
541 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
542 master, deviceId);
543
544 return Futures.get(clusterCommunicator.sendAndReceive(
545 rule,
546 REMOVE_FLOW_ENTRY,
547 SERIALIZER::encode,
548 SERIALIZER::decode,
549 master),
550 FLOW_RULE_STORE_TIMEOUT_MILLIS,
551 TimeUnit.MILLISECONDS,
552 RuntimeException.class);
553 }
554
555 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
556 final DeviceId deviceId = rule.deviceId();
557 // This is where one could mark a rule as removed and still keep it in the store.
558 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
559 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
560 }
561
562 @Override
563 public void batchOperationComplete(FlowRuleBatchEvent event) {
564 //FIXME: need a per device pending response
565 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
566 if (nodeId == null) {
567 notifyDelegate(event);
568 } else {
569 // TODO check unicast return value
570 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
571 //error log: log.warn("Failed to respond to peer for batch operation result");
572 }
573 }
574
575 private final class OnStoreBatch implements ClusterMessageHandler {
576
577 @Override
578 public void handle(final ClusterMessage message) {
579 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
580 log.debug("received batch request {}", operation);
581
582 final DeviceId deviceId = operation.deviceId();
583 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
584 if (!local.equals(replicaInfo.master().orNull())) {
585
586 Set<FlowRule> failures = new HashSet<>(operation.size());
587 for (FlowRuleBatchEntry op : operation.getOperations()) {
588 failures.add(op.target());
589 }
590 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
591 // This node is no longer the master, respond as all failed.
592 // TODO: we might want to wrap response in envelope
593 // to distinguish sw programming failure and hand over
594 // it make sense in the latter case to retry immediately.
595 message.respond(SERIALIZER.encode(allFailed));
596 return;
597 }
598
599 pendingResponses.put(operation.id(), message.sender());
600 storeBatchInternal(operation);
601 }
602 }
603
Madan Jampanif7536ab2015-05-07 23:23:23 -0700604 private class InternalFlowTable implements ReplicaInfoEventListener {
Madan Jampani86940d92015-05-06 11:47:57 -0700605
606 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
607 flowEntries = new ConcurrentHashMap<>();
608
609 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
610 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
611 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
612
613 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
614 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
615 }
616
Madan Jampanif7536ab2015-05-07 23:23:23 -0700617 @Override
618 public void event(ReplicaInfoEvent event) {
619 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
620 DeviceId deviceId = event.subject();
621 if (!Objects.equal(local, replicaInfoManager.getReplicaInfoFor(deviceId).master())) {
622 // ignore since this event is for a device this node does not manage.
623 return;
624 }
625 NodeId latestBackupNode = getBackupNode(deviceId);
626 NodeId existingBackupNode = lastBackupNodes.get(deviceId);
627 if (Objects.equal(latestBackupNode, existingBackupNode)) {
628 // ignore since backup location hasn't changed.
629 return;
630 }
631 backupFlowEntries(latestBackupNode, Sets.newHashSet(deviceId));
632 }
633 }
634
635 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
636 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
637 Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
638 Maps.newConcurrentMap();
639 flowEntries.forEach((key, value) -> {
640 if (deviceIds.contains(key)) {
641 deviceFlowEntries.put(key, value);
642 }
643 });
644 clusterCommunicator.unicast(deviceFlowEntries,
645 FLOW_TABLE_BACKUP,
646 SERIALIZER::encode,
647 nodeId);
648 deviceIds.forEach(id -> {
649 lastBackupTimes.put(id, System.currentTimeMillis());
650 lastBackupNodes.put(id, nodeId);
651 });
652 }
653
Madan Jampani86940d92015-05-06 11:47:57 -0700654 /**
655 * Returns the flow table for specified device.
656 *
657 * @param deviceId identifier of the device
658 * @return Map representing Flow Table of given device.
659 */
660 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
661 return createIfAbsentUnchecked(flowEntries, deviceId, lazyEmptyFlowTable());
662 }
663
664 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
665 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
666 }
667
668 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
669 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
670 return flowEntries.stream()
671 .filter(entry -> Objects.equal(entry, rule))
672 .findAny()
673 .orElse(null);
674 }
675
676 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
677 Set<FlowEntry> result = Sets.newHashSet();
678 getFlowTable(deviceId).values().forEach(result::addAll);
679 return result;
680 }
681
682 public StoredFlowEntry getFlowEntry(FlowRule rule) {
683 return getFlowEntryInternal(rule);
684 }
685
686 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
687 return getFlowEntriesInternal(deviceId);
688 }
689
690 public void add(FlowEntry rule) {
691 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
692 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
693 }
694
695 public boolean remove(DeviceId deviceId, FlowEntry rule) {
696 try {
697 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
698 } finally {
699 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
700 }
701 }
702
703 private NodeId getBackupNode(DeviceId deviceId) {
704 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
705 // pick the standby which is most likely to become next master
706 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
707 }
708
709 private void backup() {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700710 if (!backupEnabled) {
711 return;
712 }
Madan Jampani86940d92015-05-06 11:47:57 -0700713 try {
714 // determine the set of devices that we need to backup during this run.
715 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
716 .stream()
717 .filter(deviceId -> {
718 Long lastBackupTime = lastBackupTimes.get(deviceId);
719 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
720 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
721 return lastBackupTime == null
722 || !Objects.equal(lastBackupNode, getBackupNode(deviceId))
723 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
724 })
725 .collect(Collectors.toSet());
726
727 // compute a mapping from node to the set of devices whose flow entries it should backup
728 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
729 devicesToBackup.forEach(deviceId -> {
730 NodeId backupLocation = getBackupNode(deviceId);
731 if (backupLocation != null) {
732 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
733 .add(deviceId);
734 }
735 });
Madan Jampani86940d92015-05-06 11:47:57 -0700736 // send the device flow entries to their respective backup nodes
Madan Jampanif7536ab2015-05-07 23:23:23 -0700737 devicesToBackupByNode.forEach(this::backupFlowEntries);
Madan Jampani86940d92015-05-06 11:47:57 -0700738 } catch (Exception e) {
739 log.error("Backup failed.", e);
740 }
741 }
742
743 private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700744 log.debug("Received flows for {} to backup", flowTables.keySet());
Madan Jampani86940d92015-05-06 11:47:57 -0700745 Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700746 // Only process those devices are that not managed by the local node.
747 Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
748 .forEach((deviceId, flowTable) -> {
749 Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
750 deviceFlowTable.clear();
751 deviceFlowTable.putAll(flowTable);
752 });
Madan Jampani86940d92015-05-06 11:47:57 -0700753 }
754 }
755}