blob: a95eee1adfc332f8d2e4100fc40125662ec7b2c4 [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import com.google.common.base.Objects;
19import com.google.common.collect.Iterables;
20import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
22import com.google.common.util.concurrent.Futures;
23
24import org.apache.felix.scr.annotations.Activate;
25import org.apache.felix.scr.annotations.Component;
26import org.apache.felix.scr.annotations.Deactivate;
27import org.apache.felix.scr.annotations.Modified;
28import org.apache.felix.scr.annotations.Property;
29import org.apache.felix.scr.annotations.Reference;
30import org.apache.felix.scr.annotations.ReferenceCardinality;
31import org.apache.felix.scr.annotations.Service;
32import org.onlab.util.KryoNamespace;
Madan Jampani86940d92015-05-06 11:47:57 -070033import org.onlab.util.Tools;
34import org.onosproject.cfg.ComponentConfigService;
35import org.onosproject.cluster.ClusterService;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.core.CoreService;
38import org.onosproject.core.IdGenerator;
39import org.onosproject.mastership.MastershipService;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.device.DeviceService;
42import org.onosproject.net.flow.CompletedBatchOperation;
43import org.onosproject.net.flow.DefaultFlowEntry;
44import org.onosproject.net.flow.FlowEntry;
45import org.onosproject.net.flow.FlowEntry.FlowEntryState;
46import org.onosproject.net.flow.FlowId;
47import org.onosproject.net.flow.FlowRule;
48import org.onosproject.net.flow.FlowRuleBatchEntry;
49import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
50import org.onosproject.net.flow.FlowRuleBatchEvent;
51import org.onosproject.net.flow.FlowRuleBatchOperation;
52import org.onosproject.net.flow.FlowRuleBatchRequest;
53import org.onosproject.net.flow.FlowRuleEvent;
54import org.onosproject.net.flow.FlowRuleEvent.Type;
55import org.onosproject.net.flow.FlowRuleService;
56import org.onosproject.net.flow.FlowRuleStore;
57import org.onosproject.net.flow.FlowRuleStoreDelegate;
58import org.onosproject.net.flow.StoredFlowEntry;
59import org.onosproject.store.AbstractStore;
60import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
61import org.onosproject.store.cluster.messaging.ClusterMessage;
62import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanif7536ab2015-05-07 23:23:23 -070063import org.onosproject.store.flow.ReplicaInfoEvent;
64import org.onosproject.store.flow.ReplicaInfoEventListener;
Madan Jampani86940d92015-05-06 11:47:57 -070065import org.onosproject.store.flow.ReplicaInfoService;
66import org.onosproject.store.serializers.KryoSerializer;
67import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070068import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Madan Jampani86940d92015-05-06 11:47:57 -070069import org.osgi.service.component.ComponentContext;
70import org.slf4j.Logger;
71
Madan Jampani86940d92015-05-06 11:47:57 -070072import java.util.Collections;
73import java.util.Dictionary;
74import java.util.HashSet;
75import java.util.List;
76import java.util.Map;
77import java.util.Set;
Madan Jampani86940d92015-05-06 11:47:57 -070078import java.util.concurrent.ExecutorService;
79import java.util.concurrent.Executors;
80import java.util.concurrent.ScheduledExecutorService;
Madan Jampani08bf17b2015-05-06 16:25:26 -070081import java.util.concurrent.ScheduledFuture;
Madan Jampani86940d92015-05-06 11:47:57 -070082import java.util.concurrent.TimeUnit;
83import java.util.concurrent.atomic.AtomicInteger;
84import java.util.stream.Collectors;
85
Madan Jampani86940d92015-05-06 11:47:57 -070086import static com.google.common.base.Strings.isNullOrEmpty;
87import static org.onlab.util.Tools.get;
88import static org.onlab.util.Tools.groupedThreads;
89import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
90import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
91import static org.slf4j.LoggerFactory.getLogger;
92
93/**
94 * Manages inventory of flow rules using a distributed state management protocol.
95 */
96@Component(immediate = true, enabled = true)
97@Service
98public class NewDistributedFlowRuleStore
99 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
100 implements FlowRuleStore {
101
102 private final Logger log = getLogger(getClass());
103
104 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
105 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700106 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Madan Jampani86940d92015-05-06 11:47:57 -0700107 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
108
109 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
110 label = "Number of threads in the message handler pool")
111 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
112
113 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
114 label = "Indicates whether backups are enabled or not")
115 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
116
Madan Jampani08bf17b2015-05-06 16:25:26 -0700117 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
118 label = "Delay in ms between successive backup runs")
119 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
120
Madan Jampani86940d92015-05-06 11:47:57 -0700121 private InternalFlowTable flowTable = new InternalFlowTable();
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
124 protected ReplicaInfoService replicaInfoManager;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected ClusterCommunicationService clusterCommunicator;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
130 protected ClusterService clusterService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
133 protected DeviceService deviceService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
136 protected CoreService coreService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139 protected ComponentConfigService configService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142 protected MastershipService mastershipService;
143
144 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
145 private ExecutorService messageHandlingExecutor;
146
Madan Jampani08bf17b2015-05-06 16:25:26 -0700147 private ScheduledFuture<?> backupTask;
Madan Jampani86940d92015-05-06 11:47:57 -0700148 private final ScheduledExecutorService backupSenderExecutor =
149 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
150
151 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
152 @Override
153 protected void setupKryoPool() {
154 serializerPool = KryoNamespace.newBuilder()
155 .register(DistributedStoreSerializers.STORE_COMMON)
156 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Madan Jampani86940d92015-05-06 11:47:57 -0700157 .build();
158 }
159 };
160
161 private IdGenerator idGenerator;
162 private NodeId local;
163
164 @Activate
165 public void activate(ComponentContext context) {
166 configService.registerProperties(getClass());
167
168 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
169
170 local = clusterService.getLocalNode().id();
171
172 messageHandlingExecutor = Executors.newFixedThreadPool(
173 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
174
175 registerMessageHandlers(messageHandlingExecutor);
176
Madan Jampani08bf17b2015-05-06 16:25:26 -0700177 if (backupEnabled) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700178 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700179 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
180 flowTable::backup,
181 0,
182 backupPeriod,
183 TimeUnit.MILLISECONDS);
184 }
Madan Jampani86940d92015-05-06 11:47:57 -0700185
186 logConfig("Started");
187 }
188
189 @Deactivate
190 public void deactivate(ComponentContext context) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700191 if (backupEnabled) {
192 replicaInfoManager.removeListener(flowTable);
193 backupTask.cancel(true);
194 }
Madan Jampani86940d92015-05-06 11:47:57 -0700195 configService.unregisterProperties(getClass(), false);
196 unregisterMessageHandlers();
197 messageHandlingExecutor.shutdownNow();
198 backupSenderExecutor.shutdownNow();
199 log.info("Stopped");
200 }
201
202 @SuppressWarnings("rawtypes")
203 @Modified
204 public void modified(ComponentContext context) {
205 if (context == null) {
206 backupEnabled = DEFAULT_BACKUP_ENABLED;
207 logConfig("Default config");
208 return;
209 }
210
211 Dictionary properties = context.getProperties();
212 int newPoolSize;
213 boolean newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700214 int newBackupPeriod;
Madan Jampani86940d92015-05-06 11:47:57 -0700215 try {
216 String s = get(properties, "msgHandlerPoolSize");
217 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
218
219 s = get(properties, "backupEnabled");
220 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
221
Madan Jampani08bf17b2015-05-06 16:25:26 -0700222 s = get(properties, "backupPeriod");
223 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
224
Madan Jampani86940d92015-05-06 11:47:57 -0700225 } catch (NumberFormatException | ClassCastException e) {
226 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
227 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700228 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Madan Jampani86940d92015-05-06 11:47:57 -0700229 }
230
Madan Jampani08bf17b2015-05-06 16:25:26 -0700231 boolean restartBackupTask = false;
Madan Jampani86940d92015-05-06 11:47:57 -0700232 if (newBackupEnabled != backupEnabled) {
233 backupEnabled = newBackupEnabled;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700234 if (!backupEnabled) {
235 replicaInfoManager.removeListener(flowTable);
236 if (backupTask != null) {
237 backupTask.cancel(false);
238 backupTask = null;
239 }
240 } else {
241 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700242 }
243 restartBackupTask = backupEnabled;
244 }
245 if (newBackupPeriod != backupPeriod) {
246 backupPeriod = newBackupPeriod;
247 restartBackupTask = backupEnabled;
248 }
249 if (restartBackupTask) {
250 if (backupTask != null) {
251 // cancel previously running task
252 backupTask.cancel(false);
253 }
254 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
255 flowTable::backup,
256 0,
257 backupPeriod,
258 TimeUnit.MILLISECONDS);
Madan Jampani86940d92015-05-06 11:47:57 -0700259 }
260 if (newPoolSize != msgHandlerPoolSize) {
261 msgHandlerPoolSize = newPoolSize;
262 ExecutorService oldMsgHandler = messageHandlingExecutor;
263 messageHandlingExecutor = Executors.newFixedThreadPool(
264 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
265
266 // replace previously registered handlers.
267 registerMessageHandlers(messageHandlingExecutor);
268 oldMsgHandler.shutdown();
269 }
270 logConfig("Reconfigured");
271 }
272
273 private void registerMessageHandlers(ExecutorService executor) {
274
275 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
276 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
277 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
278 clusterCommunicator.addSubscriber(
279 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
280 clusterCommunicator.addSubscriber(
281 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
282 clusterCommunicator.addSubscriber(
283 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
284 clusterCommunicator.addSubscriber(
285 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
286 clusterCommunicator.addSubscriber(
Madan Jampani654b58a2015-05-22 11:28:11 -0700287 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
Madan Jampani86940d92015-05-06 11:47:57 -0700288 }
289
290 private void unregisterMessageHandlers() {
291 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
292 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
293 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
294 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
295 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
296 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
297 }
298
299 private void logConfig(String prefix) {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700300 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
301 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
Madan Jampani86940d92015-05-06 11:47:57 -0700302 }
303
304 // This is not a efficient operation on a distributed sharded
305 // flow store. We need to revisit the need for this operation or at least
306 // make it device specific.
307 @Override
308 public int getFlowRuleCount() {
309 AtomicInteger sum = new AtomicInteger(0);
310 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
311 return sum.get();
312 }
313
314 @Override
315 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700316 NodeId master = mastershipService.getMasterFor(rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700317
318 if (master == null) {
319 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
320 return null;
321 }
322
323 if (Objects.equal(local, master)) {
324 return flowTable.getFlowEntry(rule);
325 }
326
327 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
328 master, rule.deviceId());
329
330 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
331 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
332 SERIALIZER::encode,
333 SERIALIZER::decode,
334 master),
335 FLOW_RULE_STORE_TIMEOUT_MILLIS,
336 TimeUnit.MILLISECONDS,
337 null);
338 }
339
340 @Override
341 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700342 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700343
344 if (master == null) {
345 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
346 return Collections.emptyList();
347 }
348
349 if (Objects.equal(local, master)) {
350 return flowTable.getFlowEntries(deviceId);
351 }
352
353 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
354 master, deviceId);
355
356 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
357 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
358 SERIALIZER::encode,
359 SERIALIZER::decode,
360 master),
361 FLOW_RULE_STORE_TIMEOUT_MILLIS,
362 TimeUnit.MILLISECONDS,
363 Collections.emptyList());
364 }
365
366 @Override
367 public void storeFlowRule(FlowRule rule) {
368 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700369 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700370 rule.deviceId(), idGenerator.getNewId()));
371 }
372
373 @Override
374 public void storeBatch(FlowRuleBatchOperation operation) {
375 if (operation.getOperations().isEmpty()) {
376 notifyDelegate(FlowRuleBatchEvent.completed(
377 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
378 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
379 return;
380 }
381
382 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700383 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700384
385 if (master == null) {
386 log.warn("No master for {} : flows will be marked for removal", deviceId);
387
388 updateStoreInternal(operation);
389
390 notifyDelegate(FlowRuleBatchEvent.completed(
391 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
392 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
393 return;
394 }
395
396 if (Objects.equal(local, master)) {
397 storeBatchInternal(operation);
398 return;
399 }
400
401 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
402 master, deviceId);
403
Madan Jampani175e8fd2015-05-20 14:10:45 -0700404 clusterCommunicator.unicast(operation,
405 APPLY_BATCH_FLOWS,
406 SERIALIZER::encode,
407 master)
408 .whenComplete((result, error) -> {
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700409 if (error != null) {
Madan Jampani15f1bc42015-05-28 10:51:52 -0700410 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Madan Jampani86940d92015-05-06 11:47:57 -0700411
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700412 Set<FlowRule> allFailures = operation.getOperations()
413 .stream()
414 .map(op -> op.target())
415 .collect(Collectors.toSet());
Madan Jampani86940d92015-05-06 11:47:57 -0700416
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700417 notifyDelegate(FlowRuleBatchEvent.completed(
418 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
419 new CompletedBatchOperation(false, allFailures, deviceId)));
420 }
Madan Jampani175e8fd2015-05-20 14:10:45 -0700421 });
Madan Jampani86940d92015-05-06 11:47:57 -0700422 }
423
424 private void storeBatchInternal(FlowRuleBatchOperation operation) {
425
426 final DeviceId did = operation.deviceId();
427 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
428 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
429 if (currentOps.isEmpty()) {
430 batchOperationComplete(FlowRuleBatchEvent.completed(
431 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
432 new CompletedBatchOperation(true, Collections.emptySet(), did)));
433 return;
434 }
435
436 notifyDelegate(FlowRuleBatchEvent.requested(new
437 FlowRuleBatchRequest(operation.id(),
438 currentOps), operation.deviceId()));
439 }
440
441 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
442 return operation.getOperations().stream().map(
443 op -> {
444 StoredFlowEntry entry;
445 switch (op.operator()) {
446 case ADD:
447 entry = new DefaultFlowEntry(op.target());
448 // always add requested FlowRule
449 // Note: 2 equal FlowEntry may have different treatment
450 flowTable.remove(entry.deviceId(), entry);
451 flowTable.add(entry);
452
453 return op;
454 case REMOVE:
455 entry = flowTable.getFlowEntry(op.target());
456 if (entry != null) {
457 entry.setState(FlowEntryState.PENDING_REMOVE);
458 return op;
459 }
460 break;
461 case MODIFY:
462 //TODO: figure this out at some point
463 break;
464 default:
465 log.warn("Unknown flow operation operator: {}", op.operator());
466 }
467 return null;
468 }
469 ).filter(op -> op != null).collect(Collectors.toSet());
470 }
471
472 @Override
473 public void deleteFlowRule(FlowRule rule) {
474 storeBatch(
475 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700476 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700477 new FlowRuleBatchEntry(
478 FlowRuleOperation.REMOVE,
479 rule)), rule.deviceId(), idGenerator.getNewId()));
480 }
481
482 @Override
483 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700484 NodeId master = mastershipService.getMasterFor(rule.deviceId());
485 if (Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700486 return addOrUpdateFlowRuleInternal(rule);
487 }
488
489 log.warn("Tried to update FlowRule {} state,"
490 + " while the Node was not the master.", rule);
491 return null;
492 }
493
494 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
495 // check if this new rule is an update to an existing entry
496 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
497 if (stored != null) {
498 stored.setBytes(rule.bytes());
499 stored.setLife(rule.life());
500 stored.setPackets(rule.packets());
501 if (stored.state() == FlowEntryState.PENDING_ADD) {
502 stored.setState(FlowEntryState.ADDED);
503 return new FlowRuleEvent(Type.RULE_ADDED, rule);
504 }
505 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
506 }
507
508 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
509 // TODO: also update backup if the behavior is correct.
510 flowTable.add(rule);
511 return null;
512 }
513
514 @Override
515 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
516 final DeviceId deviceId = rule.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700517 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700518
519 if (Objects.equal(local, master)) {
520 // bypass and handle it locally
521 return removeFlowRuleInternal(rule);
522 }
523
524 if (master == null) {
525 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
526 // TODO: revisit if this should be null (="no-op") or Exception
527 return null;
528 }
529
530 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
531 master, deviceId);
532
533 return Futures.get(clusterCommunicator.sendAndReceive(
534 rule,
535 REMOVE_FLOW_ENTRY,
536 SERIALIZER::encode,
537 SERIALIZER::decode,
538 master),
539 FLOW_RULE_STORE_TIMEOUT_MILLIS,
540 TimeUnit.MILLISECONDS,
541 RuntimeException.class);
542 }
543
544 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
545 final DeviceId deviceId = rule.deviceId();
546 // This is where one could mark a rule as removed and still keep it in the store.
547 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
548 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
549 }
550
551 @Override
552 public void batchOperationComplete(FlowRuleBatchEvent event) {
553 //FIXME: need a per device pending response
554 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
555 if (nodeId == null) {
556 notifyDelegate(event);
557 } else {
558 // TODO check unicast return value
559 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
560 //error log: log.warn("Failed to respond to peer for batch operation result");
561 }
562 }
563
564 private final class OnStoreBatch implements ClusterMessageHandler {
565
566 @Override
567 public void handle(final ClusterMessage message) {
568 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
569 log.debug("received batch request {}", operation);
570
571 final DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700572 NodeId master = mastershipService.getMasterFor(deviceId);
573 if (!Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700574 Set<FlowRule> failures = new HashSet<>(operation.size());
575 for (FlowRuleBatchEntry op : operation.getOperations()) {
576 failures.add(op.target());
577 }
578 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
579 // This node is no longer the master, respond as all failed.
580 // TODO: we might want to wrap response in envelope
581 // to distinguish sw programming failure and hand over
582 // it make sense in the latter case to retry immediately.
583 message.respond(SERIALIZER.encode(allFailed));
584 return;
585 }
586
587 pendingResponses.put(operation.id(), message.sender());
588 storeBatchInternal(operation);
589 }
590 }
591
Madan Jampanif7536ab2015-05-07 23:23:23 -0700592 private class InternalFlowTable implements ReplicaInfoEventListener {
Madan Jampani86940d92015-05-06 11:47:57 -0700593
Madan Jampani5c3766c2015-06-02 15:54:41 -0700594 private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
595 flowEntries = Maps.newConcurrentMap();
Madan Jampani86940d92015-05-06 11:47:57 -0700596
597 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
598 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
599 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
600
Madan Jampanif7536ab2015-05-07 23:23:23 -0700601 @Override
602 public void event(ReplicaInfoEvent event) {
Madan Jampania98bf932015-06-02 12:01:36 -0700603 if (!backupEnabled) {
604 return;
605 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700606 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
607 DeviceId deviceId = event.subject();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700608 NodeId master = mastershipService.getMasterFor(deviceId);
609 if (!Objects.equal(local, master)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700610 // ignore since this event is for a device this node does not manage.
611 return;
612 }
Madan Jampani7267c552015-05-20 22:39:17 -0700613 NodeId newBackupNode = getBackupNode(deviceId);
614 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
615 if (Objects.equal(newBackupNode, currentBackupNode)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700616 // ignore since backup location hasn't changed.
617 return;
618 }
Madan Jampani7267c552015-05-20 22:39:17 -0700619 if (currentBackupNode != null && newBackupNode == null) {
620 // Current backup node is most likely down and no alternate backup node
621 // has been chosen. Clear current backup location so that we can resume
622 // backups when either current backup comes online or a different backup node
623 // is chosen.
624 log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
625 + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
626 lastBackupNodes.remove(deviceId);
627 lastBackupTimes.remove(deviceId);
628 return;
629 // TODO: Pick any available node as backup and ensure hand-off occurs when
630 // a new master is elected.
631 }
632 log.info("Backup location for {} has changed from {} to {}.",
633 deviceId, currentBackupNode, newBackupNode);
634 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
Madan Jampani03062682015-05-19 17:57:51 -0700635 0,
636 TimeUnit.SECONDS);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700637 }
638 }
639
640 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
641 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
Madan Jampani654b58a2015-05-22 11:28:11 -0700642 Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Madan Jampanif7536ab2015-05-07 23:23:23 -0700643 Maps.newConcurrentMap();
644 flowEntries.forEach((key, value) -> {
645 if (deviceIds.contains(key)) {
646 deviceFlowEntries.put(key, value);
647 }
648 });
Madan Jampani654b58a2015-05-22 11:28:11 -0700649 clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
650 deviceFlowEntries,
651 FLOW_TABLE_BACKUP,
652 SERIALIZER::encode,
653 SERIALIZER::decode,
654 nodeId)
655 .whenComplete((backedupDevices, error) -> {
656 Set<DeviceId> devicesNotBackedup = error != null ?
657 deviceFlowEntries.keySet() :
658 Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
659 if (devicesNotBackedup.size() > 0) {
660 log.warn("Failed to backup devices: {}", devicesNotBackedup, error);
661 }
662 if (backedupDevices != null) {
663 backedupDevices.forEach(id -> {
664 lastBackupTimes.put(id, System.currentTimeMillis());
665 lastBackupNodes.put(id, nodeId);
666 });
667 }
668 });
Madan Jampanif7536ab2015-05-07 23:23:23 -0700669 }
670
Madan Jampani86940d92015-05-06 11:47:57 -0700671 /**
672 * Returns the flow table for specified device.
673 *
674 * @param deviceId identifier of the device
675 * @return Map representing Flow Table of given device.
676 */
Madan Jampani5c3766c2015-06-02 15:54:41 -0700677 private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
678 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
Madan Jampani86940d92015-05-06 11:47:57 -0700679 }
680
681 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
682 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
683 }
684
685 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
686 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
687 return flowEntries.stream()
688 .filter(entry -> Objects.equal(entry, rule))
689 .findAny()
690 .orElse(null);
691 }
692
693 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
694 Set<FlowEntry> result = Sets.newHashSet();
695 getFlowTable(deviceId).values().forEach(result::addAll);
696 return result;
697 }
698
699 public StoredFlowEntry getFlowEntry(FlowRule rule) {
700 return getFlowEntryInternal(rule);
701 }
702
703 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
704 return getFlowEntriesInternal(deviceId);
705 }
706
707 public void add(FlowEntry rule) {
708 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
709 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
710 }
711
712 public boolean remove(DeviceId deviceId, FlowEntry rule) {
713 try {
714 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
715 } finally {
716 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
717 }
718 }
719
720 private NodeId getBackupNode(DeviceId deviceId) {
721 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
722 // pick the standby which is most likely to become next master
723 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
724 }
725
726 private void backup() {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700727 if (!backupEnabled) {
728 return;
729 }
Madan Jampani86940d92015-05-06 11:47:57 -0700730 try {
731 // determine the set of devices that we need to backup during this run.
732 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
733 .stream()
734 .filter(deviceId -> {
735 Long lastBackupTime = lastBackupTimes.get(deviceId);
736 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
737 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
Madan Jampani7267c552015-05-20 22:39:17 -0700738 NodeId newBackupNode = getBackupNode(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700739 return lastBackupTime == null
Madan Jampani7267c552015-05-20 22:39:17 -0700740 || !Objects.equal(lastBackupNode, newBackupNode)
Madan Jampani86940d92015-05-06 11:47:57 -0700741 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
742 })
743 .collect(Collectors.toSet());
744
745 // compute a mapping from node to the set of devices whose flow entries it should backup
746 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
747 devicesToBackup.forEach(deviceId -> {
748 NodeId backupLocation = getBackupNode(deviceId);
749 if (backupLocation != null) {
750 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
751 .add(deviceId);
752 }
753 });
Madan Jampani86940d92015-05-06 11:47:57 -0700754 // send the device flow entries to their respective backup nodes
Madan Jampanif7536ab2015-05-07 23:23:23 -0700755 devicesToBackupByNode.forEach(this::backupFlowEntries);
Madan Jampani86940d92015-05-06 11:47:57 -0700756 } catch (Exception e) {
757 log.error("Backup failed.", e);
758 }
759 }
760
Madan Jampani654b58a2015-05-22 11:28:11 -0700761 private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
Madan Jampani7267c552015-05-20 22:39:17 -0700762 log.debug("Received flowEntries for {} to backup", flowTables.keySet());
Madan Jampani654b58a2015-05-22 11:28:11 -0700763 Set<DeviceId> backedupDevices = Sets.newHashSet();
764 try {
Madan Jampania98bf932015-06-02 12:01:36 -0700765 flowTables.forEach((deviceId, deviceFlowTable) -> {
766 // Only process those devices are that not managed by the local node.
767 if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) {
768 Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId);
769 backupFlowTable.clear();
770 backupFlowTable.putAll(deviceFlowTable);
771 backedupDevices.add(deviceId);
772 }
Madan Jampani08bf17b2015-05-06 16:25:26 -0700773 });
Madan Jampani654b58a2015-05-22 11:28:11 -0700774 } catch (Exception e) {
775 log.warn("Failure processing backup request", e);
776 }
777 return backedupDevices;
Madan Jampani86940d92015-05-06 11:47:57 -0700778 }
779 }
780}