blob: 249cdf89036056fb07e1a2306d1c16cc66467410 [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import com.google.common.base.Objects;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
22import com.google.common.util.concurrent.Futures;
23
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.KryoNamespace;
33import org.onlab.util.NewConcurrentHashMap;
34import org.onlab.util.Tools;
35import org.onosproject.cfg.ComponentConfigService;
36import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.CoreService;
39import org.onosproject.core.IdGenerator;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.flow.CompletedBatchOperation;
44import org.onosproject.net.flow.DefaultFlowEntry;
45import org.onosproject.net.flow.FlowEntry;
46import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
49import org.onosproject.net.flow.FlowRuleBatchEntry;
50import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51import org.onosproject.net.flow.FlowRuleBatchEvent;
52import org.onosproject.net.flow.FlowRuleBatchOperation;
53import org.onosproject.net.flow.FlowRuleBatchRequest;
54import org.onosproject.net.flow.FlowRuleEvent;
55import org.onosproject.net.flow.FlowRuleEvent.Type;
56import org.onosproject.net.flow.FlowRuleService;
57import org.onosproject.net.flow.FlowRuleStore;
58import org.onosproject.net.flow.FlowRuleStoreDelegate;
59import org.onosproject.net.flow.StoredFlowEntry;
60import org.onosproject.store.AbstractStore;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62import org.onosproject.store.cluster.messaging.ClusterMessage;
63import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanif7536ab2015-05-07 23:23:23 -070064import org.onosproject.store.flow.ReplicaInfoEvent;
65import org.onosproject.store.flow.ReplicaInfoEventListener;
Madan Jampani86940d92015-05-06 11:47:57 -070066import org.onosproject.store.flow.ReplicaInfoService;
67import org.onosproject.store.serializers.KryoSerializer;
68import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070069import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Madan Jampani86940d92015-05-06 11:47:57 -070070import org.osgi.service.component.ComponentContext;
71import org.slf4j.Logger;
72
Madan Jampani86940d92015-05-06 11:47:57 -070073import java.util.Collections;
74import java.util.Dictionary;
75import java.util.HashSet;
76import java.util.List;
77import java.util.Map;
78import java.util.Set;
79import java.util.concurrent.ConcurrentHashMap;
80import java.util.concurrent.ConcurrentMap;
81import java.util.concurrent.ExecutorService;
82import java.util.concurrent.Executors;
83import java.util.concurrent.ScheduledExecutorService;
Madan Jampani08bf17b2015-05-06 16:25:26 -070084import java.util.concurrent.ScheduledFuture;
Madan Jampani86940d92015-05-06 11:47:57 -070085import java.util.concurrent.TimeUnit;
86import java.util.concurrent.atomic.AtomicInteger;
87import java.util.stream.Collectors;
88
89import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
90import static com.google.common.base.Strings.isNullOrEmpty;
91import static org.onlab.util.Tools.get;
92import static org.onlab.util.Tools.groupedThreads;
93import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
94import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
95import static org.slf4j.LoggerFactory.getLogger;
96
97/**
98 * Manages inventory of flow rules using a distributed state management protocol.
99 */
100@Component(immediate = true, enabled = true)
101@Service
102public class NewDistributedFlowRuleStore
103 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
104 implements FlowRuleStore {
105
106 private final Logger log = getLogger(getClass());
107
108 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
109 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700110 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Madan Jampani86940d92015-05-06 11:47:57 -0700111 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
112
113 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
114 label = "Number of threads in the message handler pool")
115 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
116
117 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
118 label = "Indicates whether backups are enabled or not")
119 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
120
Madan Jampani08bf17b2015-05-06 16:25:26 -0700121 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
122 label = "Delay in ms between successive backup runs")
123 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
124
Madan Jampani86940d92015-05-06 11:47:57 -0700125 private InternalFlowTable flowTable = new InternalFlowTable();
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected ReplicaInfoService replicaInfoManager;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ClusterCommunicationService clusterCommunicator;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected ClusterService clusterService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected DeviceService deviceService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected CoreService coreService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected ComponentConfigService configService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
146 protected MastershipService mastershipService;
147
148 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
149 private ExecutorService messageHandlingExecutor;
150
Madan Jampani08bf17b2015-05-06 16:25:26 -0700151 private ScheduledFuture<?> backupTask;
Madan Jampani86940d92015-05-06 11:47:57 -0700152 private final ScheduledExecutorService backupSenderExecutor =
153 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
154
155 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
156 @Override
157 protected void setupKryoPool() {
158 serializerPool = KryoNamespace.newBuilder()
159 .register(DistributedStoreSerializers.STORE_COMMON)
160 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Madan Jampani86940d92015-05-06 11:47:57 -0700161 .build();
162 }
163 };
164
165 private IdGenerator idGenerator;
166 private NodeId local;
167
168 @Activate
169 public void activate(ComponentContext context) {
170 configService.registerProperties(getClass());
171
172 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
173
174 local = clusterService.getLocalNode().id();
175
176 messageHandlingExecutor = Executors.newFixedThreadPool(
177 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
178
179 registerMessageHandlers(messageHandlingExecutor);
180
Madan Jampani08bf17b2015-05-06 16:25:26 -0700181 if (backupEnabled) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700182 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700183 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
184 flowTable::backup,
185 0,
186 backupPeriod,
187 TimeUnit.MILLISECONDS);
188 }
Madan Jampani86940d92015-05-06 11:47:57 -0700189
190 logConfig("Started");
191 }
192
193 @Deactivate
194 public void deactivate(ComponentContext context) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700195 if (backupEnabled) {
196 replicaInfoManager.removeListener(flowTable);
197 backupTask.cancel(true);
198 }
Madan Jampani86940d92015-05-06 11:47:57 -0700199 configService.unregisterProperties(getClass(), false);
200 unregisterMessageHandlers();
201 messageHandlingExecutor.shutdownNow();
202 backupSenderExecutor.shutdownNow();
203 log.info("Stopped");
204 }
205
206 @SuppressWarnings("rawtypes")
207 @Modified
208 public void modified(ComponentContext context) {
209 if (context == null) {
210 backupEnabled = DEFAULT_BACKUP_ENABLED;
211 logConfig("Default config");
212 return;
213 }
214
215 Dictionary properties = context.getProperties();
216 int newPoolSize;
217 boolean newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700218 int newBackupPeriod;
Madan Jampani86940d92015-05-06 11:47:57 -0700219 try {
220 String s = get(properties, "msgHandlerPoolSize");
221 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
222
223 s = get(properties, "backupEnabled");
224 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
225
Madan Jampani08bf17b2015-05-06 16:25:26 -0700226 s = get(properties, "backupPeriod");
227 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
228
Madan Jampani86940d92015-05-06 11:47:57 -0700229 } catch (NumberFormatException | ClassCastException e) {
230 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
231 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700232 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Madan Jampani86940d92015-05-06 11:47:57 -0700233 }
234
Madan Jampani08bf17b2015-05-06 16:25:26 -0700235 boolean restartBackupTask = false;
Madan Jampani86940d92015-05-06 11:47:57 -0700236 if (newBackupEnabled != backupEnabled) {
237 backupEnabled = newBackupEnabled;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700238 if (!backupEnabled) {
239 replicaInfoManager.removeListener(flowTable);
240 if (backupTask != null) {
241 backupTask.cancel(false);
242 backupTask = null;
243 }
244 } else {
245 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700246 }
247 restartBackupTask = backupEnabled;
248 }
249 if (newBackupPeriod != backupPeriod) {
250 backupPeriod = newBackupPeriod;
251 restartBackupTask = backupEnabled;
252 }
253 if (restartBackupTask) {
254 if (backupTask != null) {
255 // cancel previously running task
256 backupTask.cancel(false);
257 }
258 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
259 flowTable::backup,
260 0,
261 backupPeriod,
262 TimeUnit.MILLISECONDS);
Madan Jampani86940d92015-05-06 11:47:57 -0700263 }
264 if (newPoolSize != msgHandlerPoolSize) {
265 msgHandlerPoolSize = newPoolSize;
266 ExecutorService oldMsgHandler = messageHandlingExecutor;
267 messageHandlingExecutor = Executors.newFixedThreadPool(
268 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
269
270 // replace previously registered handlers.
271 registerMessageHandlers(messageHandlingExecutor);
272 oldMsgHandler.shutdown();
273 }
274 logConfig("Reconfigured");
275 }
276
277 private void registerMessageHandlers(ExecutorService executor) {
278
279 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
280 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
281 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
282 clusterCommunicator.addSubscriber(
283 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
284 clusterCommunicator.addSubscriber(
285 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
286 clusterCommunicator.addSubscriber(
287 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
288 clusterCommunicator.addSubscriber(
289 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
290 clusterCommunicator.addSubscriber(
Madan Jampani654b58a2015-05-22 11:28:11 -0700291 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
Madan Jampani86940d92015-05-06 11:47:57 -0700292 }
293
294 private void unregisterMessageHandlers() {
295 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
296 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
297 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
298 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
299 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
300 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
301 }
302
303 private void logConfig(String prefix) {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700304 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
305 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
Madan Jampani86940d92015-05-06 11:47:57 -0700306 }
307
308 // This is not a efficient operation on a distributed sharded
309 // flow store. We need to revisit the need for this operation or at least
310 // make it device specific.
311 @Override
312 public int getFlowRuleCount() {
313 AtomicInteger sum = new AtomicInteger(0);
314 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
315 return sum.get();
316 }
317
318 @Override
319 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700320 NodeId master = mastershipService.getMasterFor(rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700321
322 if (master == null) {
323 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
324 return null;
325 }
326
327 if (Objects.equal(local, master)) {
328 return flowTable.getFlowEntry(rule);
329 }
330
331 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
332 master, rule.deviceId());
333
334 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
335 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
336 SERIALIZER::encode,
337 SERIALIZER::decode,
338 master),
339 FLOW_RULE_STORE_TIMEOUT_MILLIS,
340 TimeUnit.MILLISECONDS,
341 null);
342 }
343
344 @Override
345 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700346 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700347
348 if (master == null) {
349 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
350 return Collections.emptyList();
351 }
352
353 if (Objects.equal(local, master)) {
354 return flowTable.getFlowEntries(deviceId);
355 }
356
357 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
358 master, deviceId);
359
360 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
361 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
362 SERIALIZER::encode,
363 SERIALIZER::decode,
364 master),
365 FLOW_RULE_STORE_TIMEOUT_MILLIS,
366 TimeUnit.MILLISECONDS,
367 Collections.emptyList());
368 }
369
370 @Override
371 public void storeFlowRule(FlowRule rule) {
372 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700373 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700374 rule.deviceId(), idGenerator.getNewId()));
375 }
376
377 @Override
378 public void storeBatch(FlowRuleBatchOperation operation) {
379 if (operation.getOperations().isEmpty()) {
380 notifyDelegate(FlowRuleBatchEvent.completed(
381 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
382 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
383 return;
384 }
385
386 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700387 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700388
389 if (master == null) {
390 log.warn("No master for {} : flows will be marked for removal", deviceId);
391
392 updateStoreInternal(operation);
393
394 notifyDelegate(FlowRuleBatchEvent.completed(
395 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
396 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
397 return;
398 }
399
400 if (Objects.equal(local, master)) {
401 storeBatchInternal(operation);
402 return;
403 }
404
405 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
406 master, deviceId);
407
Madan Jampani175e8fd2015-05-20 14:10:45 -0700408 clusterCommunicator.unicast(operation,
409 APPLY_BATCH_FLOWS,
410 SERIALIZER::encode,
411 master)
412 .whenComplete((result, error) -> {
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700413 if (error != null) {
Madan Jampani15f1bc42015-05-28 10:51:52 -0700414 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Madan Jampani86940d92015-05-06 11:47:57 -0700415
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700416 Set<FlowRule> allFailures = operation.getOperations()
417 .stream()
418 .map(op -> op.target())
419 .collect(Collectors.toSet());
Madan Jampani86940d92015-05-06 11:47:57 -0700420
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700421 notifyDelegate(FlowRuleBatchEvent.completed(
422 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
423 new CompletedBatchOperation(false, allFailures, deviceId)));
424 }
Madan Jampani175e8fd2015-05-20 14:10:45 -0700425 });
Madan Jampani86940d92015-05-06 11:47:57 -0700426 }
427
428 private void storeBatchInternal(FlowRuleBatchOperation operation) {
429
430 final DeviceId did = operation.deviceId();
431 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
432 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
433 if (currentOps.isEmpty()) {
434 batchOperationComplete(FlowRuleBatchEvent.completed(
435 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
436 new CompletedBatchOperation(true, Collections.emptySet(), did)));
437 return;
438 }
439
440 notifyDelegate(FlowRuleBatchEvent.requested(new
441 FlowRuleBatchRequest(operation.id(),
442 currentOps), operation.deviceId()));
443 }
444
445 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
446 return operation.getOperations().stream().map(
447 op -> {
448 StoredFlowEntry entry;
449 switch (op.operator()) {
450 case ADD:
451 entry = new DefaultFlowEntry(op.target());
452 // always add requested FlowRule
453 // Note: 2 equal FlowEntry may have different treatment
454 flowTable.remove(entry.deviceId(), entry);
455 flowTable.add(entry);
456
457 return op;
458 case REMOVE:
459 entry = flowTable.getFlowEntry(op.target());
460 if (entry != null) {
461 entry.setState(FlowEntryState.PENDING_REMOVE);
462 return op;
463 }
464 break;
465 case MODIFY:
466 //TODO: figure this out at some point
467 break;
468 default:
469 log.warn("Unknown flow operation operator: {}", op.operator());
470 }
471 return null;
472 }
473 ).filter(op -> op != null).collect(Collectors.toSet());
474 }
475
476 @Override
477 public void deleteFlowRule(FlowRule rule) {
478 storeBatch(
479 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700480 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700481 new FlowRuleBatchEntry(
482 FlowRuleOperation.REMOVE,
483 rule)), rule.deviceId(), idGenerator.getNewId()));
484 }
485
486 @Override
487 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700488 NodeId master = mastershipService.getMasterFor(rule.deviceId());
489 if (Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700490 return addOrUpdateFlowRuleInternal(rule);
491 }
492
493 log.warn("Tried to update FlowRule {} state,"
494 + " while the Node was not the master.", rule);
495 return null;
496 }
497
498 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
499 // check if this new rule is an update to an existing entry
500 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
501 if (stored != null) {
502 stored.setBytes(rule.bytes());
503 stored.setLife(rule.life());
504 stored.setPackets(rule.packets());
505 if (stored.state() == FlowEntryState.PENDING_ADD) {
506 stored.setState(FlowEntryState.ADDED);
507 return new FlowRuleEvent(Type.RULE_ADDED, rule);
508 }
509 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
510 }
511
512 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
513 // TODO: also update backup if the behavior is correct.
514 flowTable.add(rule);
515 return null;
516 }
517
518 @Override
519 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
520 final DeviceId deviceId = rule.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700521 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700522
523 if (Objects.equal(local, master)) {
524 // bypass and handle it locally
525 return removeFlowRuleInternal(rule);
526 }
527
528 if (master == null) {
529 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
530 // TODO: revisit if this should be null (="no-op") or Exception
531 return null;
532 }
533
534 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
535 master, deviceId);
536
537 return Futures.get(clusterCommunicator.sendAndReceive(
538 rule,
539 REMOVE_FLOW_ENTRY,
540 SERIALIZER::encode,
541 SERIALIZER::decode,
542 master),
543 FLOW_RULE_STORE_TIMEOUT_MILLIS,
544 TimeUnit.MILLISECONDS,
545 RuntimeException.class);
546 }
547
548 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
549 final DeviceId deviceId = rule.deviceId();
550 // This is where one could mark a rule as removed and still keep it in the store.
551 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
552 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
553 }
554
555 @Override
556 public void batchOperationComplete(FlowRuleBatchEvent event) {
557 //FIXME: need a per device pending response
558 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
559 if (nodeId == null) {
560 notifyDelegate(event);
561 } else {
562 // TODO check unicast return value
563 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
564 //error log: log.warn("Failed to respond to peer for batch operation result");
565 }
566 }
567
568 private final class OnStoreBatch implements ClusterMessageHandler {
569
570 @Override
571 public void handle(final ClusterMessage message) {
572 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
573 log.debug("received batch request {}", operation);
574
575 final DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700576 NodeId master = mastershipService.getMasterFor(deviceId);
577 if (!Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700578 Set<FlowRule> failures = new HashSet<>(operation.size());
579 for (FlowRuleBatchEntry op : operation.getOperations()) {
580 failures.add(op.target());
581 }
582 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
583 // This node is no longer the master, respond as all failed.
584 // TODO: we might want to wrap response in envelope
585 // to distinguish sw programming failure and hand over
586 // it make sense in the latter case to retry immediately.
587 message.respond(SERIALIZER.encode(allFailed));
588 return;
589 }
590
591 pendingResponses.put(operation.id(), message.sender());
592 storeBatchInternal(operation);
593 }
594 }
595
Madan Jampanif7536ab2015-05-07 23:23:23 -0700596 private class InternalFlowTable implements ReplicaInfoEventListener {
Madan Jampani86940d92015-05-06 11:47:57 -0700597
598 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
599 flowEntries = new ConcurrentHashMap<>();
600
601 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
602 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
603 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
604
605 private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
606 return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
607 }
608
Madan Jampanif7536ab2015-05-07 23:23:23 -0700609 @Override
610 public void event(ReplicaInfoEvent event) {
Madan Jampania98bf932015-06-02 12:01:36 -0700611 if (!backupEnabled) {
612 return;
613 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700614 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
615 DeviceId deviceId = event.subject();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700616 NodeId master = mastershipService.getMasterFor(deviceId);
617 if (!Objects.equal(local, master)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700618 // ignore since this event is for a device this node does not manage.
619 return;
620 }
Madan Jampani7267c552015-05-20 22:39:17 -0700621 NodeId newBackupNode = getBackupNode(deviceId);
622 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
623 if (Objects.equal(newBackupNode, currentBackupNode)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700624 // ignore since backup location hasn't changed.
625 return;
626 }
Madan Jampani7267c552015-05-20 22:39:17 -0700627 if (currentBackupNode != null && newBackupNode == null) {
628 // Current backup node is most likely down and no alternate backup node
629 // has been chosen. Clear current backup location so that we can resume
630 // backups when either current backup comes online or a different backup node
631 // is chosen.
632 log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
633 + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
634 lastBackupNodes.remove(deviceId);
635 lastBackupTimes.remove(deviceId);
636 return;
637 // TODO: Pick any available node as backup and ensure hand-off occurs when
638 // a new master is elected.
639 }
640 log.info("Backup location for {} has changed from {} to {}.",
641 deviceId, currentBackupNode, newBackupNode);
642 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
Madan Jampani03062682015-05-19 17:57:51 -0700643 0,
644 TimeUnit.SECONDS);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700645 }
646 }
647
648 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
649 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
Madan Jampani654b58a2015-05-22 11:28:11 -0700650 Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Madan Jampanif7536ab2015-05-07 23:23:23 -0700651 Maps.newConcurrentMap();
652 flowEntries.forEach((key, value) -> {
653 if (deviceIds.contains(key)) {
654 deviceFlowEntries.put(key, value);
655 }
656 });
Madan Jampani654b58a2015-05-22 11:28:11 -0700657 clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
658 deviceFlowEntries,
659 FLOW_TABLE_BACKUP,
660 SERIALIZER::encode,
661 SERIALIZER::decode,
662 nodeId)
663 .whenComplete((backedupDevices, error) -> {
664 Set<DeviceId> devicesNotBackedup = error != null ?
665 deviceFlowEntries.keySet() :
666 Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
667 if (devicesNotBackedup.size() > 0) {
668 log.warn("Failed to backup devices: {}", devicesNotBackedup, error);
669 }
670 if (backedupDevices != null) {
671 backedupDevices.forEach(id -> {
672 lastBackupTimes.put(id, System.currentTimeMillis());
673 lastBackupNodes.put(id, nodeId);
674 });
675 }
676 });
Madan Jampanif7536ab2015-05-07 23:23:23 -0700677 }
678
Madan Jampani86940d92015-05-06 11:47:57 -0700679 /**
680 * Returns the flow table for specified device.
681 *
682 * @param deviceId identifier of the device
683 * @return Map representing Flow Table of given device.
684 */
685 private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
686 return createIfAbsentUnchecked(flowEntries, deviceId, lazyEmptyFlowTable());
687 }
688
689 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
690 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
691 }
692
693 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
694 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
695 return flowEntries.stream()
696 .filter(entry -> Objects.equal(entry, rule))
697 .findAny()
698 .orElse(null);
699 }
700
701 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
702 Set<FlowEntry> result = Sets.newHashSet();
703 getFlowTable(deviceId).values().forEach(result::addAll);
704 return result;
705 }
706
707 public StoredFlowEntry getFlowEntry(FlowRule rule) {
708 return getFlowEntryInternal(rule);
709 }
710
711 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
712 return getFlowEntriesInternal(deviceId);
713 }
714
715 public void add(FlowEntry rule) {
716 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
717 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
718 }
719
720 public boolean remove(DeviceId deviceId, FlowEntry rule) {
721 try {
722 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
723 } finally {
724 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
725 }
726 }
727
728 private NodeId getBackupNode(DeviceId deviceId) {
729 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
730 // pick the standby which is most likely to become next master
731 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
732 }
733
734 private void backup() {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700735 if (!backupEnabled) {
736 return;
737 }
Madan Jampani86940d92015-05-06 11:47:57 -0700738 try {
739 // determine the set of devices that we need to backup during this run.
740 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
741 .stream()
742 .filter(deviceId -> {
743 Long lastBackupTime = lastBackupTimes.get(deviceId);
744 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
745 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
Madan Jampani7267c552015-05-20 22:39:17 -0700746 NodeId newBackupNode = getBackupNode(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700747 return lastBackupTime == null
Madan Jampani7267c552015-05-20 22:39:17 -0700748 || !Objects.equal(lastBackupNode, newBackupNode)
Madan Jampani86940d92015-05-06 11:47:57 -0700749 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
750 })
751 .collect(Collectors.toSet());
752
753 // compute a mapping from node to the set of devices whose flow entries it should backup
754 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
755 devicesToBackup.forEach(deviceId -> {
756 NodeId backupLocation = getBackupNode(deviceId);
757 if (backupLocation != null) {
758 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
759 .add(deviceId);
760 }
761 });
Madan Jampani86940d92015-05-06 11:47:57 -0700762 // send the device flow entries to their respective backup nodes
Madan Jampanif7536ab2015-05-07 23:23:23 -0700763 devicesToBackupByNode.forEach(this::backupFlowEntries);
Madan Jampani86940d92015-05-06 11:47:57 -0700764 } catch (Exception e) {
765 log.error("Backup failed.", e);
766 }
767 }
768
Madan Jampani654b58a2015-05-22 11:28:11 -0700769 private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
Madan Jampani7267c552015-05-20 22:39:17 -0700770 log.debug("Received flowEntries for {} to backup", flowTables.keySet());
Madan Jampani654b58a2015-05-22 11:28:11 -0700771 Set<DeviceId> backedupDevices = Sets.newHashSet();
772 try {
Madan Jampania98bf932015-06-02 12:01:36 -0700773 flowTables.forEach((deviceId, deviceFlowTable) -> {
774 // Only process those devices are that not managed by the local node.
775 if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) {
776 Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId);
777 backupFlowTable.clear();
778 backupFlowTable.putAll(deviceFlowTable);
779 backedupDevices.add(deviceId);
780 }
Madan Jampani08bf17b2015-05-06 16:25:26 -0700781 });
Madan Jampani654b58a2015-05-22 11:28:11 -0700782 } catch (Exception e) {
783 log.warn("Failure processing backup request", e);
784 }
785 return backedupDevices;
Madan Jampani86940d92015-05-06 11:47:57 -0700786 }
787 }
788}