blob: 828b5a07323ec6405b454a5024b3c6b7850de1fe [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import com.google.common.base.Objects;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
22import com.google.common.util.concurrent.Futures;
23
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.KryoNamespace;
33import org.onlab.util.NewConcurrentHashMap;
34import org.onlab.util.Tools;
35import org.onosproject.cfg.ComponentConfigService;
36import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.CoreService;
39import org.onosproject.core.IdGenerator;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.flow.CompletedBatchOperation;
44import org.onosproject.net.flow.DefaultFlowEntry;
45import org.onosproject.net.flow.FlowEntry;
46import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
49import org.onosproject.net.flow.FlowRuleBatchEntry;
50import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51import org.onosproject.net.flow.FlowRuleBatchEvent;
52import org.onosproject.net.flow.FlowRuleBatchOperation;
53import org.onosproject.net.flow.FlowRuleBatchRequest;
54import org.onosproject.net.flow.FlowRuleEvent;
55import org.onosproject.net.flow.FlowRuleEvent.Type;
56import org.onosproject.net.flow.FlowRuleService;
57import org.onosproject.net.flow.FlowRuleStore;
58import org.onosproject.net.flow.FlowRuleStoreDelegate;
59import org.onosproject.net.flow.StoredFlowEntry;
60import org.onosproject.store.AbstractStore;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62import org.onosproject.store.cluster.messaging.ClusterMessage;
63import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanif7536ab2015-05-07 23:23:23 -070064import org.onosproject.store.flow.ReplicaInfoEvent;
65import org.onosproject.store.flow.ReplicaInfoEventListener;
Madan Jampani86940d92015-05-06 11:47:57 -070066import org.onosproject.store.flow.ReplicaInfoService;
67import org.onosproject.store.serializers.KryoSerializer;
68import org.onosproject.store.serializers.StoreSerializer;
69import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
70import org.osgi.service.component.ComponentContext;
71import org.slf4j.Logger;
72
Madan Jampani86940d92015-05-06 11:47:57 -070073import java.util.Collections;
74import java.util.Dictionary;
75import java.util.HashSet;
76import java.util.List;
77import java.util.Map;
78import java.util.Set;
79import java.util.concurrent.ConcurrentHashMap;
80import java.util.concurrent.ConcurrentMap;
81import java.util.concurrent.ExecutorService;
82import java.util.concurrent.Executors;
83import java.util.concurrent.ScheduledExecutorService;
Madan Jampani08bf17b2015-05-06 16:25:26 -070084import java.util.concurrent.ScheduledFuture;
Madan Jampani86940d92015-05-06 11:47:57 -070085import java.util.concurrent.TimeUnit;
86import java.util.concurrent.atomic.AtomicInteger;
87import java.util.stream.Collectors;
88
89import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
90import static com.google.common.base.Strings.isNullOrEmpty;
91import static org.onlab.util.Tools.get;
92import static org.onlab.util.Tools.groupedThreads;
93import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
94import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
95import static org.slf4j.LoggerFactory.getLogger;
96
97/**
98 * Manages inventory of flow rules using a distributed state management protocol.
99 */
100@Component(immediate = true, enabled = true)
101@Service
102public class NewDistributedFlowRuleStore
103 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
104 implements FlowRuleStore {
105
106 private final Logger log = getLogger(getClass());
107
108 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
109 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700110 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Madan Jampani86940d92015-05-06 11:47:57 -0700111 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
112
113 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
114 label = "Number of threads in the message handler pool")
115 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
116
117 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
118 label = "Indicates whether backups are enabled or not")
119 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
120
Madan Jampani08bf17b2015-05-06 16:25:26 -0700121 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
122 label = "Delay in ms between successive backup runs")
123 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
124
Madan Jampani86940d92015-05-06 11:47:57 -0700125 private InternalFlowTable flowTable = new InternalFlowTable();
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected ReplicaInfoService replicaInfoManager;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ClusterCommunicationService clusterCommunicator;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected ClusterService clusterService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected DeviceService deviceService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected CoreService coreService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected ComponentConfigService configService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
146 protected MastershipService mastershipService;
147
148 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
149 private ExecutorService messageHandlingExecutor;
150
Madan Jampani08bf17b2015-05-06 16:25:26 -0700151 private ScheduledFuture<?> backupTask;
Madan Jampani86940d92015-05-06 11:47:57 -0700152 private final ScheduledExecutorService backupSenderExecutor =
153 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
154
155 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
156 @Override
157 protected void setupKryoPool() {
158 serializerPool = KryoNamespace.newBuilder()
159 .register(DistributedStoreSerializers.STORE_COMMON)
160 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
161 .register(FlowRuleEvent.class)
162 .register(FlowRuleEvent.Type.class)
163 .build();
164 }
165 };
166
167 private IdGenerator idGenerator;
168 private NodeId local;
169
170 @Activate
171 public void activate(ComponentContext context) {
172 configService.registerProperties(getClass());
173
174 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
175
176 local = clusterService.getLocalNode().id();
177
178 messageHandlingExecutor = Executors.newFixedThreadPool(
179 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
180
181 registerMessageHandlers(messageHandlingExecutor);
182
Madan Jampani08bf17b2015-05-06 16:25:26 -0700183 if (backupEnabled) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700184 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700185 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
186 flowTable::backup,
187 0,
188 backupPeriod,
189 TimeUnit.MILLISECONDS);
190 }
Madan Jampani86940d92015-05-06 11:47:57 -0700191
192 logConfig("Started");
193 }
194
195 @Deactivate
196 public void deactivate(ComponentContext context) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700197 if (backupEnabled) {
198 replicaInfoManager.removeListener(flowTable);
199 backupTask.cancel(true);
200 }
Madan Jampani86940d92015-05-06 11:47:57 -0700201 configService.unregisterProperties(getClass(), false);
202 unregisterMessageHandlers();
203 messageHandlingExecutor.shutdownNow();
204 backupSenderExecutor.shutdownNow();
205 log.info("Stopped");
206 }
207
208 @SuppressWarnings("rawtypes")
209 @Modified
210 public void modified(ComponentContext context) {
211 if (context == null) {
212 backupEnabled = DEFAULT_BACKUP_ENABLED;
213 logConfig("Default config");
214 return;
215 }
216
217 Dictionary properties = context.getProperties();
218 int newPoolSize;
219 boolean newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700220 int newBackupPeriod;
Madan Jampani86940d92015-05-06 11:47:57 -0700221 try {
222 String s = get(properties, "msgHandlerPoolSize");
223 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
224
225 s = get(properties, "backupEnabled");
226 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
227
Madan Jampani08bf17b2015-05-06 16:25:26 -0700228 s = get(properties, "backupPeriod");
229 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
230
Madan Jampani86940d92015-05-06 11:47:57 -0700231 } catch (NumberFormatException | ClassCastException e) {
232 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
233 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700234 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Madan Jampani86940d92015-05-06 11:47:57 -0700235 }
236
Madan Jampani08bf17b2015-05-06 16:25:26 -0700237 boolean restartBackupTask = false;
Madan Jampani86940d92015-05-06 11:47:57 -0700238 if (newBackupEnabled != backupEnabled) {
239 backupEnabled = newBackupEnabled;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700240 if (!backupEnabled) {
241 replicaInfoManager.removeListener(flowTable);
242 if (backupTask != null) {
243 backupTask.cancel(false);
244 backupTask = null;
245 }
246 } else {
247 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700248 }
249 restartBackupTask = backupEnabled;
250 }
251 if (newBackupPeriod != backupPeriod) {
252 backupPeriod = newBackupPeriod;
253 restartBackupTask = backupEnabled;
254 }
255 if (restartBackupTask) {
256 if (backupTask != null) {
257 // cancel previously running task
258 backupTask.cancel(false);
259 }
260 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
261 flowTable::backup,
262 0,
263 backupPeriod,
264 TimeUnit.MILLISECONDS);
Madan Jampani86940d92015-05-06 11:47:57 -0700265 }
266 if (newPoolSize != msgHandlerPoolSize) {
267 msgHandlerPoolSize = newPoolSize;
268 ExecutorService oldMsgHandler = messageHandlingExecutor;
269 messageHandlingExecutor = Executors.newFixedThreadPool(
270 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
271
272 // replace previously registered handlers.
273 registerMessageHandlers(messageHandlingExecutor);
274 oldMsgHandler.shutdown();
275 }
276 logConfig("Reconfigured");
277 }
278
279 private void registerMessageHandlers(ExecutorService executor) {
280
281 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
282 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
283 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
284 clusterCommunicator.addSubscriber(
285 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
286 clusterCommunicator.addSubscriber(
287 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
288 clusterCommunicator.addSubscriber(
289 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
290 clusterCommunicator.addSubscriber(
291 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
292 clusterCommunicator.addSubscriber(
293 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, executor);
294 }
295
296 private void unregisterMessageHandlers() {
297 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
298 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
299 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
300 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
301 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
302 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
303 }
304
305 private void logConfig(String prefix) {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700306 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
307 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
Madan Jampani86940d92015-05-06 11:47:57 -0700308 }
309
310 // This is not a efficient operation on a distributed sharded
311 // flow store. We need to revisit the need for this operation or at least
312 // make it device specific.
313 @Override
314 public int getFlowRuleCount() {
315 AtomicInteger sum = new AtomicInteger(0);
316 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
317 return sum.get();
318 }
319
320 @Override
321 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700322 NodeId master = mastershipService.getMasterFor(rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700323
324 if (master == null) {
325 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
326 return null;
327 }
328
329 if (Objects.equal(local, master)) {
330 return flowTable.getFlowEntry(rule);
331 }
332
333 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
334 master, rule.deviceId());
335
336 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
337 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
338 SERIALIZER::encode,
339 SERIALIZER::decode,
340 master),
341 FLOW_RULE_STORE_TIMEOUT_MILLIS,
342 TimeUnit.MILLISECONDS,
343 null);
344 }
345
346 @Override
347 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700348 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700349
350 if (master == null) {
351 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
352 return Collections.emptyList();
353 }
354
355 if (Objects.equal(local, master)) {
356 return flowTable.getFlowEntries(deviceId);
357 }
358
359 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
360 master, deviceId);
361
362 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
363 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
364 SERIALIZER::encode,
365 SERIALIZER::decode,
366 master),
367 FLOW_RULE_STORE_TIMEOUT_MILLIS,
368 TimeUnit.MILLISECONDS,
369 Collections.emptyList());
370 }
371
372 @Override
373 public void storeFlowRule(FlowRule rule) {
374 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700375 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700376 rule.deviceId(), idGenerator.getNewId()));
377 }
378
379 @Override
380 public void storeBatch(FlowRuleBatchOperation operation) {
381 if (operation.getOperations().isEmpty()) {
382 notifyDelegate(FlowRuleBatchEvent.completed(
383 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
384 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
385 return;
386 }
387
388 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700389 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700390
391 if (master == null) {
392 log.warn("No master for {} : flows will be marked for removal", deviceId);
393
394 updateStoreInternal(operation);
395
396 notifyDelegate(FlowRuleBatchEvent.completed(
397 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
398 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
399 return;
400 }
401
402 if (Objects.equal(local, master)) {
403 storeBatchInternal(operation);
404 return;
405 }
406
407 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
408 master, deviceId);
409
410 if (!clusterCommunicator.unicast(operation,
411 APPLY_BATCH_FLOWS,
412 SERIALIZER::encode,
413 master)) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700414 log.warn("Failed to storeBatch: {} to {}", operation, master);
Madan Jampani86940d92015-05-06 11:47:57 -0700415
416 Set<FlowRule> allFailures = operation.getOperations().stream()
417 .map(op -> op.target())
418 .collect(Collectors.toSet());
419
420 notifyDelegate(FlowRuleBatchEvent.completed(
421 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
422 new CompletedBatchOperation(false, allFailures, deviceId)));
423 return;
424 }
425 }
426
427 private void storeBatchInternal(FlowRuleBatchOperation operation) {
428
429 final DeviceId did = operation.deviceId();
430 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
431 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
432 if (currentOps.isEmpty()) {
433 batchOperationComplete(FlowRuleBatchEvent.completed(
434 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
435 new CompletedBatchOperation(true, Collections.emptySet(), did)));
436 return;
437 }
438
439 notifyDelegate(FlowRuleBatchEvent.requested(new
440 FlowRuleBatchRequest(operation.id(),
441 currentOps), operation.deviceId()));
442 }
443
444 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
445 return operation.getOperations().stream().map(
446 op -> {
447 StoredFlowEntry entry;
448 switch (op.operator()) {
449 case ADD:
450 entry = new DefaultFlowEntry(op.target());
451 // always add requested FlowRule
452 // Note: 2 equal FlowEntry may have different treatment
453 flowTable.remove(entry.deviceId(), entry);
454 flowTable.add(entry);
455
456 return op;
457 case REMOVE:
458 entry = flowTable.getFlowEntry(op.target());
459 if (entry != null) {
460 entry.setState(FlowEntryState.PENDING_REMOVE);
461 return op;
462 }
463 break;
464 case MODIFY:
465 //TODO: figure this out at some point
466 break;
467 default:
468 log.warn("Unknown flow operation operator: {}", op.operator());
469 }
470 return null;
471 }
472 ).filter(op -> op != null).collect(Collectors.toSet());
473 }
474
475 @Override
476 public void deleteFlowRule(FlowRule rule) {
477 storeBatch(
478 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700479 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700480 new FlowRuleBatchEntry(
481 FlowRuleOperation.REMOVE,
482 rule)), rule.deviceId(), idGenerator.getNewId()));
483 }
484
485 @Override
486 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700487 NodeId master = mastershipService.getMasterFor(rule.deviceId());
488 if (Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700489 return addOrUpdateFlowRuleInternal(rule);
490 }
491
492 log.warn("Tried to update FlowRule {} state,"
493 + " while the Node was not the master.", rule);
494 return null;
495 }
496
497 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
498 // check if this new rule is an update to an existing entry
499 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
500 if (stored != null) {
501 stored.setBytes(rule.bytes());
502 stored.setLife(rule.life());
503 stored.setPackets(rule.packets());
504 if (stored.state() == FlowEntryState.PENDING_ADD) {
505 stored.setState(FlowEntryState.ADDED);
506 return new FlowRuleEvent(Type.RULE_ADDED, rule);
507 }
508 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
509 }
510
511 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
512 // TODO: also update backup if the behavior is correct.
513 flowTable.add(rule);
514 return null;
515 }
516
517 @Override
518 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
519 final DeviceId deviceId = rule.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700520 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700521
522 if (Objects.equal(local, master)) {
523 // bypass and handle it locally
524 return removeFlowRuleInternal(rule);
525 }
526
527 if (master == null) {
528 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
529 // TODO: revisit if this should be null (="no-op") or Exception
530 return null;
531 }
532
533 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
534 master, deviceId);
535
536 return Futures.get(clusterCommunicator.sendAndReceive(
537 rule,
538 REMOVE_FLOW_ENTRY,
539 SERIALIZER::encode,
540 SERIALIZER::decode,
541 master),
542 FLOW_RULE_STORE_TIMEOUT_MILLIS,
543 TimeUnit.MILLISECONDS,
544 RuntimeException.class);
545 }
546
547 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
548 final DeviceId deviceId = rule.deviceId();
549 // This is where one could mark a rule as removed and still keep it in the store.
550 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
551 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
552 }
553
554 @Override
555 public void batchOperationComplete(FlowRuleBatchEvent event) {
556 //FIXME: need a per device pending response
557 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
558 if (nodeId == null) {
559 notifyDelegate(event);
560 } else {
561 // TODO check unicast return value
562 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
563 //error log: log.warn("Failed to respond to peer for batch operation result");
564 }
565 }
566
567 private final class OnStoreBatch implements ClusterMessageHandler {
568
569 @Override
570 public void handle(final ClusterMessage message) {
571 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
572 log.debug("received batch request {}", operation);
573
574 final DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700575 NodeId master = mastershipService.getMasterFor(deviceId);
576 if (!Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700577 Set<FlowRule> failures = new HashSet<>(operation.size());
578 for (FlowRuleBatchEntry op : operation.getOperations()) {
579 failures.add(op.target());
580 }
581 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
582 // This node is no longer the master, respond as all failed.
583 // TODO: we might want to wrap response in envelope
584 // to distinguish sw programming failure and hand over
585 // it make sense in the latter case to retry immediately.
586 message.respond(SERIALIZER.encode(allFailed));
587 return;
588 }
589
590 pendingResponses.put(operation.id(), message.sender());
591 storeBatchInternal(operation);
592 }
593 }
594
Madan Jampanif7536ab2015-05-07 23:23:23 -0700595 private class InternalFlowTable implements ReplicaInfoEventListener {
Madan Jampani86940d92015-05-06 11:47:57 -0700596
597 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
598 flowEntries = new ConcurrentHashMap<>();
599
600 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
601 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
602 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
603
604 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
605 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
606 }
607
Madan Jampanif7536ab2015-05-07 23:23:23 -0700608 @Override
609 public void event(ReplicaInfoEvent event) {
610 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
611 DeviceId deviceId = event.subject();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700612 NodeId master = mastershipService.getMasterFor(deviceId);
613 if (!Objects.equal(local, master)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700614 // ignore since this event is for a device this node does not manage.
615 return;
616 }
Madan Jampani7267c552015-05-20 22:39:17 -0700617 NodeId newBackupNode = getBackupNode(deviceId);
618 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
619 if (Objects.equal(newBackupNode, currentBackupNode)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700620 // ignore since backup location hasn't changed.
621 return;
622 }
Madan Jampani7267c552015-05-20 22:39:17 -0700623 if (currentBackupNode != null && newBackupNode == null) {
624 // Current backup node is most likely down and no alternate backup node
625 // has been chosen. Clear current backup location so that we can resume
626 // backups when either current backup comes online or a different backup node
627 // is chosen.
628 log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
629 + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
630 lastBackupNodes.remove(deviceId);
631 lastBackupTimes.remove(deviceId);
632 return;
633 // TODO: Pick any available node as backup and ensure hand-off occurs when
634 // a new master is elected.
635 }
636 log.info("Backup location for {} has changed from {} to {}.",
637 deviceId, currentBackupNode, newBackupNode);
638 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
Madan Jampani03062682015-05-19 17:57:51 -0700639 0,
640 TimeUnit.SECONDS);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700641 }
642 }
643
644 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
645 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
646 Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
647 Maps.newConcurrentMap();
648 flowEntries.forEach((key, value) -> {
649 if (deviceIds.contains(key)) {
650 deviceFlowEntries.put(key, value);
651 }
652 });
653 clusterCommunicator.unicast(deviceFlowEntries,
654 FLOW_TABLE_BACKUP,
655 SERIALIZER::encode,
656 nodeId);
657 deviceIds.forEach(id -> {
658 lastBackupTimes.put(id, System.currentTimeMillis());
659 lastBackupNodes.put(id, nodeId);
660 });
661 }
662
Madan Jampani86940d92015-05-06 11:47:57 -0700663 /**
664 * Returns the flow table for specified device.
665 *
666 * @param deviceId identifier of the device
667 * @return Map representing Flow Table of given device.
668 */
669 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
670 return createIfAbsentUnchecked(flowEntries, deviceId, lazyEmptyFlowTable());
671 }
672
673 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
674 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
675 }
676
677 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
678 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
679 return flowEntries.stream()
680 .filter(entry -> Objects.equal(entry, rule))
681 .findAny()
682 .orElse(null);
683 }
684
685 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
686 Set<FlowEntry> result = Sets.newHashSet();
687 getFlowTable(deviceId).values().forEach(result::addAll);
688 return result;
689 }
690
691 public StoredFlowEntry getFlowEntry(FlowRule rule) {
692 return getFlowEntryInternal(rule);
693 }
694
695 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
696 return getFlowEntriesInternal(deviceId);
697 }
698
699 public void add(FlowEntry rule) {
700 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
701 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
702 }
703
704 public boolean remove(DeviceId deviceId, FlowEntry rule) {
705 try {
706 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
707 } finally {
708 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
709 }
710 }
711
712 private NodeId getBackupNode(DeviceId deviceId) {
713 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
714 // pick the standby which is most likely to become next master
715 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
716 }
717
718 private void backup() {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700719 if (!backupEnabled) {
720 return;
721 }
Madan Jampani86940d92015-05-06 11:47:57 -0700722 try {
723 // determine the set of devices that we need to backup during this run.
724 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
725 .stream()
726 .filter(deviceId -> {
727 Long lastBackupTime = lastBackupTimes.get(deviceId);
728 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
729 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
Madan Jampani7267c552015-05-20 22:39:17 -0700730 NodeId newBackupNode = getBackupNode(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700731 return lastBackupTime == null
Madan Jampani7267c552015-05-20 22:39:17 -0700732 || !Objects.equal(lastBackupNode, newBackupNode)
Madan Jampani86940d92015-05-06 11:47:57 -0700733 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
734 })
735 .collect(Collectors.toSet());
736
737 // compute a mapping from node to the set of devices whose flow entries it should backup
738 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
739 devicesToBackup.forEach(deviceId -> {
740 NodeId backupLocation = getBackupNode(deviceId);
741 if (backupLocation != null) {
742 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
743 .add(deviceId);
744 }
745 });
Madan Jampani86940d92015-05-06 11:47:57 -0700746 // send the device flow entries to their respective backup nodes
Madan Jampanif7536ab2015-05-07 23:23:23 -0700747 devicesToBackupByNode.forEach(this::backupFlowEntries);
Madan Jampani86940d92015-05-06 11:47:57 -0700748 } catch (Exception e) {
749 log.error("Backup failed.", e);
750 }
751 }
752
753 private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
Madan Jampani7267c552015-05-20 22:39:17 -0700754 log.debug("Received flowEntries for {} to backup", flowTables.keySet());
Madan Jampani86940d92015-05-06 11:47:57 -0700755 Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700756 // Only process those devices are that not managed by the local node.
757 Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
758 .forEach((deviceId, flowTable) -> {
759 Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
760 deviceFlowTable.clear();
761 deviceFlowTable.putAll(flowTable);
762 });
Madan Jampani86940d92015-05-06 11:47:57 -0700763 }
764 }
765}