blob: f57ba3d3d8fcc17385c904c6062c26bf7cf6801a [file] [log] [blame]
Madan Jampani86940d92015-05-06 11:47:57 -07001 /*
2 * Copyright 2014-2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import com.google.common.base.Objects;
Madan Jampani85a9b0d2015-06-03 17:15:44 -070019import com.google.common.collect.ImmutableMap;
Madan Jampani86940d92015-05-06 11:47:57 -070020import com.google.common.collect.Iterables;
21import com.google.common.collect.Maps;
22import com.google.common.collect.Sets;
23import com.google.common.util.concurrent.Futures;
24
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Modified;
29import org.apache.felix.scr.annotations.Property;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
33import org.onlab.util.KryoNamespace;
Madan Jampani86940d92015-05-06 11:47:57 -070034import org.onlab.util.Tools;
35import org.onosproject.cfg.ComponentConfigService;
36import org.onosproject.cluster.ClusterService;
37import org.onosproject.cluster.NodeId;
38import org.onosproject.core.CoreService;
39import org.onosproject.core.IdGenerator;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.flow.CompletedBatchOperation;
44import org.onosproject.net.flow.DefaultFlowEntry;
45import org.onosproject.net.flow.FlowEntry;
46import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
49import org.onosproject.net.flow.FlowRuleBatchEntry;
50import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51import org.onosproject.net.flow.FlowRuleBatchEvent;
52import org.onosproject.net.flow.FlowRuleBatchOperation;
53import org.onosproject.net.flow.FlowRuleBatchRequest;
54import org.onosproject.net.flow.FlowRuleEvent;
55import org.onosproject.net.flow.FlowRuleEvent.Type;
56import org.onosproject.net.flow.FlowRuleService;
57import org.onosproject.net.flow.FlowRuleStore;
58import org.onosproject.net.flow.FlowRuleStoreDelegate;
59import org.onosproject.net.flow.StoredFlowEntry;
60import org.onosproject.store.AbstractStore;
61import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62import org.onosproject.store.cluster.messaging.ClusterMessage;
63import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
Madan Jampanif7536ab2015-05-07 23:23:23 -070064import org.onosproject.store.flow.ReplicaInfoEvent;
65import org.onosproject.store.flow.ReplicaInfoEventListener;
Madan Jampani86940d92015-05-06 11:47:57 -070066import org.onosproject.store.flow.ReplicaInfoService;
67import org.onosproject.store.serializers.KryoSerializer;
68import org.onosproject.store.serializers.StoreSerializer;
Brian O'Connor6de2e202015-05-21 14:30:41 -070069import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
Madan Jampani86940d92015-05-06 11:47:57 -070070import org.osgi.service.component.ComponentContext;
71import org.slf4j.Logger;
72
Madan Jampani86940d92015-05-06 11:47:57 -070073import java.util.Collections;
74import java.util.Dictionary;
75import java.util.HashSet;
76import java.util.List;
77import java.util.Map;
78import java.util.Set;
Madan Jampani86940d92015-05-06 11:47:57 -070079import java.util.concurrent.ExecutorService;
80import java.util.concurrent.Executors;
81import java.util.concurrent.ScheduledExecutorService;
Madan Jampani08bf17b2015-05-06 16:25:26 -070082import java.util.concurrent.ScheduledFuture;
Madan Jampani86940d92015-05-06 11:47:57 -070083import java.util.concurrent.TimeUnit;
84import java.util.concurrent.atomic.AtomicInteger;
85import java.util.stream.Collectors;
86
Madan Jampani86940d92015-05-06 11:47:57 -070087import static com.google.common.base.Strings.isNullOrEmpty;
88import static org.onlab.util.Tools.get;
89import static org.onlab.util.Tools.groupedThreads;
90import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
91import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
92import static org.slf4j.LoggerFactory.getLogger;
93
94/**
95 * Manages inventory of flow rules using a distributed state management protocol.
96 */
97@Component(immediate = true, enabled = true)
98@Service
99public class NewDistributedFlowRuleStore
100 extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
101 implements FlowRuleStore {
102
103 private final Logger log = getLogger(getClass());
104
105 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
106 private static final boolean DEFAULT_BACKUP_ENABLED = true;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700107 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
Madan Jampani86940d92015-05-06 11:47:57 -0700108 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
109
110 @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
111 label = "Number of threads in the message handler pool")
112 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
113
114 @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
115 label = "Indicates whether backups are enabled or not")
116 private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
117
Madan Jampani08bf17b2015-05-06 16:25:26 -0700118 @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
119 label = "Delay in ms between successive backup runs")
120 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
121
Madan Jampani86940d92015-05-06 11:47:57 -0700122 private InternalFlowTable flowTable = new InternalFlowTable();
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 protected ReplicaInfoService replicaInfoManager;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 protected ClusterCommunicationService clusterCommunicator;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 protected ClusterService clusterService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 protected DeviceService deviceService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 protected CoreService coreService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 protected ComponentConfigService configService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143 protected MastershipService mastershipService;
144
145 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
146 private ExecutorService messageHandlingExecutor;
147
Madan Jampani08bf17b2015-05-06 16:25:26 -0700148 private ScheduledFuture<?> backupTask;
Madan Jampani86940d92015-05-06 11:47:57 -0700149 private final ScheduledExecutorService backupSenderExecutor =
150 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
151
152 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
153 @Override
154 protected void setupKryoPool() {
155 serializerPool = KryoNamespace.newBuilder()
156 .register(DistributedStoreSerializers.STORE_COMMON)
157 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
Madan Jampani86940d92015-05-06 11:47:57 -0700158 .build();
159 }
160 };
161
162 private IdGenerator idGenerator;
163 private NodeId local;
164
165 @Activate
166 public void activate(ComponentContext context) {
167 configService.registerProperties(getClass());
168
169 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
170
171 local = clusterService.getLocalNode().id();
172
173 messageHandlingExecutor = Executors.newFixedThreadPool(
174 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
175
176 registerMessageHandlers(messageHandlingExecutor);
177
Madan Jampani08bf17b2015-05-06 16:25:26 -0700178 if (backupEnabled) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700179 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700180 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
181 flowTable::backup,
182 0,
183 backupPeriod,
184 TimeUnit.MILLISECONDS);
185 }
Madan Jampani86940d92015-05-06 11:47:57 -0700186
187 logConfig("Started");
188 }
189
190 @Deactivate
191 public void deactivate(ComponentContext context) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700192 if (backupEnabled) {
193 replicaInfoManager.removeListener(flowTable);
194 backupTask.cancel(true);
195 }
Madan Jampani86940d92015-05-06 11:47:57 -0700196 configService.unregisterProperties(getClass(), false);
197 unregisterMessageHandlers();
198 messageHandlingExecutor.shutdownNow();
199 backupSenderExecutor.shutdownNow();
200 log.info("Stopped");
201 }
202
203 @SuppressWarnings("rawtypes")
204 @Modified
205 public void modified(ComponentContext context) {
206 if (context == null) {
207 backupEnabled = DEFAULT_BACKUP_ENABLED;
208 logConfig("Default config");
209 return;
210 }
211
212 Dictionary properties = context.getProperties();
213 int newPoolSize;
214 boolean newBackupEnabled;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700215 int newBackupPeriod;
Madan Jampani86940d92015-05-06 11:47:57 -0700216 try {
217 String s = get(properties, "msgHandlerPoolSize");
218 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
219
220 s = get(properties, "backupEnabled");
221 newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
222
Madan Jampani08bf17b2015-05-06 16:25:26 -0700223 s = get(properties, "backupPeriod");
224 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
225
Madan Jampani86940d92015-05-06 11:47:57 -0700226 } catch (NumberFormatException | ClassCastException e) {
227 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
228 newBackupEnabled = DEFAULT_BACKUP_ENABLED;
Madan Jampani08bf17b2015-05-06 16:25:26 -0700229 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Madan Jampani86940d92015-05-06 11:47:57 -0700230 }
231
Madan Jampani08bf17b2015-05-06 16:25:26 -0700232 boolean restartBackupTask = false;
Madan Jampani86940d92015-05-06 11:47:57 -0700233 if (newBackupEnabled != backupEnabled) {
234 backupEnabled = newBackupEnabled;
Madan Jampanif7536ab2015-05-07 23:23:23 -0700235 if (!backupEnabled) {
236 replicaInfoManager.removeListener(flowTable);
237 if (backupTask != null) {
238 backupTask.cancel(false);
239 backupTask = null;
240 }
241 } else {
242 replicaInfoManager.addListener(flowTable);
Madan Jampani08bf17b2015-05-06 16:25:26 -0700243 }
244 restartBackupTask = backupEnabled;
245 }
246 if (newBackupPeriod != backupPeriod) {
247 backupPeriod = newBackupPeriod;
248 restartBackupTask = backupEnabled;
249 }
250 if (restartBackupTask) {
251 if (backupTask != null) {
252 // cancel previously running task
253 backupTask.cancel(false);
254 }
255 backupTask = backupSenderExecutor.scheduleWithFixedDelay(
256 flowTable::backup,
257 0,
258 backupPeriod,
259 TimeUnit.MILLISECONDS);
Madan Jampani86940d92015-05-06 11:47:57 -0700260 }
261 if (newPoolSize != msgHandlerPoolSize) {
262 msgHandlerPoolSize = newPoolSize;
263 ExecutorService oldMsgHandler = messageHandlingExecutor;
264 messageHandlingExecutor = Executors.newFixedThreadPool(
265 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
266
267 // replace previously registered handlers.
268 registerMessageHandlers(messageHandlingExecutor);
269 oldMsgHandler.shutdown();
270 }
271 logConfig("Reconfigured");
272 }
273
274 private void registerMessageHandlers(ExecutorService executor) {
275
276 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
277 clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
278 REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
279 clusterCommunicator.addSubscriber(
280 GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
281 clusterCommunicator.addSubscriber(
282 GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
283 clusterCommunicator.addSubscriber(
284 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
285 clusterCommunicator.addSubscriber(
286 REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
287 clusterCommunicator.addSubscriber(
Madan Jampani654b58a2015-05-22 11:28:11 -0700288 FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, SERIALIZER::encode, executor);
Madan Jampani86940d92015-05-06 11:47:57 -0700289 }
290
291 private void unregisterMessageHandlers() {
292 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
293 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
294 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
295 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
296 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
297 clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
298 }
299
300 private void logConfig(String prefix) {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700301 log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}, backupPeriod = {}",
302 prefix, msgHandlerPoolSize, backupEnabled, backupPeriod);
Madan Jampani86940d92015-05-06 11:47:57 -0700303 }
304
305 // This is not a efficient operation on a distributed sharded
306 // flow store. We need to revisit the need for this operation or at least
307 // make it device specific.
308 @Override
309 public int getFlowRuleCount() {
310 AtomicInteger sum = new AtomicInteger(0);
311 deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
312 return sum.get();
313 }
314
315 @Override
316 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700317 NodeId master = mastershipService.getMasterFor(rule.deviceId());
Madan Jampani86940d92015-05-06 11:47:57 -0700318
319 if (master == null) {
320 log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
321 return null;
322 }
323
324 if (Objects.equal(local, master)) {
325 return flowTable.getFlowEntry(rule);
326 }
327
328 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
329 master, rule.deviceId());
330
331 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
332 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
333 SERIALIZER::encode,
334 SERIALIZER::decode,
335 master),
336 FLOW_RULE_STORE_TIMEOUT_MILLIS,
337 TimeUnit.MILLISECONDS,
338 null);
339 }
340
341 @Override
342 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700343 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700344
345 if (master == null) {
346 log.warn("Failed to getFlowEntries: No master for {}", deviceId);
347 return Collections.emptyList();
348 }
349
350 if (Objects.equal(local, master)) {
351 return flowTable.getFlowEntries(deviceId);
352 }
353
354 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
355 master, deviceId);
356
357 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
358 FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
359 SERIALIZER::encode,
360 SERIALIZER::decode,
361 master),
362 FLOW_RULE_STORE_TIMEOUT_MILLIS,
363 TimeUnit.MILLISECONDS,
364 Collections.emptyList());
365 }
366
367 @Override
368 public void storeFlowRule(FlowRule rule) {
369 storeBatch(new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700370 Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
Madan Jampani86940d92015-05-06 11:47:57 -0700371 rule.deviceId(), idGenerator.getNewId()));
372 }
373
374 @Override
375 public void storeBatch(FlowRuleBatchOperation operation) {
376 if (operation.getOperations().isEmpty()) {
377 notifyDelegate(FlowRuleBatchEvent.completed(
378 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
379 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
380 return;
381 }
382
383 DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700384 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700385
386 if (master == null) {
387 log.warn("No master for {} : flows will be marked for removal", deviceId);
388
389 updateStoreInternal(operation);
390
391 notifyDelegate(FlowRuleBatchEvent.completed(
392 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
393 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
394 return;
395 }
396
397 if (Objects.equal(local, master)) {
398 storeBatchInternal(operation);
399 return;
400 }
401
402 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
403 master, deviceId);
404
Madan Jampani175e8fd2015-05-20 14:10:45 -0700405 clusterCommunicator.unicast(operation,
406 APPLY_BATCH_FLOWS,
407 SERIALIZER::encode,
408 master)
409 .whenComplete((result, error) -> {
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700410 if (error != null) {
Madan Jampani15f1bc42015-05-28 10:51:52 -0700411 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
Madan Jampani86940d92015-05-06 11:47:57 -0700412
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700413 Set<FlowRule> allFailures = operation.getOperations()
414 .stream()
415 .map(op -> op.target())
416 .collect(Collectors.toSet());
Madan Jampani86940d92015-05-06 11:47:57 -0700417
Thomas Vachuskaa267ce42015-05-27 16:14:23 -0700418 notifyDelegate(FlowRuleBatchEvent.completed(
419 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
420 new CompletedBatchOperation(false, allFailures, deviceId)));
421 }
Madan Jampani175e8fd2015-05-20 14:10:45 -0700422 });
Madan Jampani86940d92015-05-06 11:47:57 -0700423 }
424
425 private void storeBatchInternal(FlowRuleBatchOperation operation) {
426
427 final DeviceId did = operation.deviceId();
428 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
429 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
430 if (currentOps.isEmpty()) {
431 batchOperationComplete(FlowRuleBatchEvent.completed(
432 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
433 new CompletedBatchOperation(true, Collections.emptySet(), did)));
434 return;
435 }
436
437 notifyDelegate(FlowRuleBatchEvent.requested(new
438 FlowRuleBatchRequest(operation.id(),
439 currentOps), operation.deviceId()));
440 }
441
442 private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
443 return operation.getOperations().stream().map(
444 op -> {
445 StoredFlowEntry entry;
446 switch (op.operator()) {
447 case ADD:
448 entry = new DefaultFlowEntry(op.target());
449 // always add requested FlowRule
450 // Note: 2 equal FlowEntry may have different treatment
451 flowTable.remove(entry.deviceId(), entry);
452 flowTable.add(entry);
453
454 return op;
455 case REMOVE:
456 entry = flowTable.getFlowEntry(op.target());
457 if (entry != null) {
458 entry.setState(FlowEntryState.PENDING_REMOVE);
459 return op;
460 }
461 break;
462 case MODIFY:
463 //TODO: figure this out at some point
464 break;
465 default:
466 log.warn("Unknown flow operation operator: {}", op.operator());
467 }
468 return null;
469 }
470 ).filter(op -> op != null).collect(Collectors.toSet());
471 }
472
473 @Override
474 public void deleteFlowRule(FlowRule rule) {
475 storeBatch(
476 new FlowRuleBatchOperation(
Sho SHIMIZU98ffca82015-05-11 08:39:24 -0700477 Collections.singletonList(
Madan Jampani86940d92015-05-06 11:47:57 -0700478 new FlowRuleBatchEntry(
479 FlowRuleOperation.REMOVE,
480 rule)), rule.deviceId(), idGenerator.getNewId()));
481 }
482
483 @Override
484 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700485 NodeId master = mastershipService.getMasterFor(rule.deviceId());
486 if (Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700487 return addOrUpdateFlowRuleInternal(rule);
488 }
489
490 log.warn("Tried to update FlowRule {} state,"
491 + " while the Node was not the master.", rule);
492 return null;
493 }
494
495 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
496 // check if this new rule is an update to an existing entry
497 StoredFlowEntry stored = flowTable.getFlowEntry(rule);
498 if (stored != null) {
499 stored.setBytes(rule.bytes());
500 stored.setLife(rule.life());
501 stored.setPackets(rule.packets());
502 if (stored.state() == FlowEntryState.PENDING_ADD) {
503 stored.setState(FlowEntryState.ADDED);
504 return new FlowRuleEvent(Type.RULE_ADDED, rule);
505 }
506 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
507 }
508
509 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
510 // TODO: also update backup if the behavior is correct.
511 flowTable.add(rule);
512 return null;
513 }
514
515 @Override
516 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
517 final DeviceId deviceId = rule.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700518 NodeId master = mastershipService.getMasterFor(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700519
520 if (Objects.equal(local, master)) {
521 // bypass and handle it locally
522 return removeFlowRuleInternal(rule);
523 }
524
525 if (master == null) {
526 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
527 // TODO: revisit if this should be null (="no-op") or Exception
528 return null;
529 }
530
531 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
532 master, deviceId);
533
534 return Futures.get(clusterCommunicator.sendAndReceive(
535 rule,
536 REMOVE_FLOW_ENTRY,
537 SERIALIZER::encode,
538 SERIALIZER::decode,
539 master),
540 FLOW_RULE_STORE_TIMEOUT_MILLIS,
541 TimeUnit.MILLISECONDS,
542 RuntimeException.class);
543 }
544
545 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
546 final DeviceId deviceId = rule.deviceId();
547 // This is where one could mark a rule as removed and still keep it in the store.
548 final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
549 return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
550 }
551
552 @Override
553 public void batchOperationComplete(FlowRuleBatchEvent event) {
554 //FIXME: need a per device pending response
555 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
556 if (nodeId == null) {
557 notifyDelegate(event);
558 } else {
559 // TODO check unicast return value
560 clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
561 //error log: log.warn("Failed to respond to peer for batch operation result");
562 }
563 }
564
565 private final class OnStoreBatch implements ClusterMessageHandler {
566
567 @Override
568 public void handle(final ClusterMessage message) {
569 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
570 log.debug("received batch request {}", operation);
571
572 final DeviceId deviceId = operation.deviceId();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700573 NodeId master = mastershipService.getMasterFor(deviceId);
574 if (!Objects.equal(local, master)) {
Madan Jampani86940d92015-05-06 11:47:57 -0700575 Set<FlowRule> failures = new HashSet<>(operation.size());
576 for (FlowRuleBatchEntry op : operation.getOperations()) {
577 failures.add(op.target());
578 }
579 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
580 // This node is no longer the master, respond as all failed.
581 // TODO: we might want to wrap response in envelope
582 // to distinguish sw programming failure and hand over
583 // it make sense in the latter case to retry immediately.
584 message.respond(SERIALIZER.encode(allFailed));
585 return;
586 }
587
588 pendingResponses.put(operation.id(), message.sender());
589 storeBatchInternal(operation);
590 }
591 }
592
Madan Jampanif7536ab2015-05-07 23:23:23 -0700593 private class InternalFlowTable implements ReplicaInfoEventListener {
Madan Jampani86940d92015-05-06 11:47:57 -0700594
Madan Jampani5c3766c2015-06-02 15:54:41 -0700595 private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>
596 flowEntries = Maps.newConcurrentMap();
Madan Jampani86940d92015-05-06 11:47:57 -0700597
598 private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
599 private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
600 private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
601
Madan Jampanif7536ab2015-05-07 23:23:23 -0700602 @Override
603 public void event(ReplicaInfoEvent event) {
Madan Jampania98bf932015-06-02 12:01:36 -0700604 if (!backupEnabled) {
605 return;
606 }
Madan Jampanif7536ab2015-05-07 23:23:23 -0700607 if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
608 DeviceId deviceId = event.subject();
Madan Jampani6bd2d9f2015-05-14 14:10:42 -0700609 NodeId master = mastershipService.getMasterFor(deviceId);
610 if (!Objects.equal(local, master)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700611 // ignore since this event is for a device this node does not manage.
612 return;
613 }
Madan Jampani7267c552015-05-20 22:39:17 -0700614 NodeId newBackupNode = getBackupNode(deviceId);
615 NodeId currentBackupNode = lastBackupNodes.get(deviceId);
616 if (Objects.equal(newBackupNode, currentBackupNode)) {
Madan Jampanif7536ab2015-05-07 23:23:23 -0700617 // ignore since backup location hasn't changed.
618 return;
619 }
Madan Jampani7267c552015-05-20 22:39:17 -0700620 if (currentBackupNode != null && newBackupNode == null) {
621 // Current backup node is most likely down and no alternate backup node
622 // has been chosen. Clear current backup location so that we can resume
623 // backups when either current backup comes online or a different backup node
624 // is chosen.
625 log.warn("Lost backup location {} for deviceId {} and no alternate backup node exists. "
626 + "Flows can be lost if the master goes down", currentBackupNode, deviceId);
627 lastBackupNodes.remove(deviceId);
628 lastBackupTimes.remove(deviceId);
629 return;
630 // TODO: Pick any available node as backup and ensure hand-off occurs when
631 // a new master is elected.
632 }
633 log.info("Backup location for {} has changed from {} to {}.",
634 deviceId, currentBackupNode, newBackupNode);
635 backupSenderExecutor.schedule(() -> backupFlowEntries(newBackupNode, Sets.newHashSet(deviceId)),
Madan Jampani03062682015-05-19 17:57:51 -0700636 0,
637 TimeUnit.SECONDS);
Madan Jampanif7536ab2015-05-07 23:23:23 -0700638 }
639 }
640
641 private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
642 log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
Madan Jampani654b58a2015-05-22 11:28:11 -0700643 Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
Madan Jampanif7536ab2015-05-07 23:23:23 -0700644 Maps.newConcurrentMap();
Madan Jampani85a9b0d2015-06-03 17:15:44 -0700645 deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id))));
Madan Jampani654b58a2015-05-22 11:28:11 -0700646 clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive(
647 deviceFlowEntries,
648 FLOW_TABLE_BACKUP,
649 SERIALIZER::encode,
650 SERIALIZER::decode,
651 nodeId)
652 .whenComplete((backedupDevices, error) -> {
653 Set<DeviceId> devicesNotBackedup = error != null ?
654 deviceFlowEntries.keySet() :
655 Sets.difference(deviceFlowEntries.keySet(), backedupDevices);
656 if (devicesNotBackedup.size() > 0) {
Madan Jampani85a9b0d2015-06-03 17:15:44 -0700657 log.warn("Failed to backup devices: {}. Reason: {}",
658 devicesNotBackedup, error.getMessage());
Madan Jampani654b58a2015-05-22 11:28:11 -0700659 }
660 if (backedupDevices != null) {
661 backedupDevices.forEach(id -> {
662 lastBackupTimes.put(id, System.currentTimeMillis());
663 lastBackupNodes.put(id, nodeId);
664 });
665 }
666 });
Madan Jampanif7536ab2015-05-07 23:23:23 -0700667 }
668
Madan Jampani86940d92015-05-06 11:47:57 -0700669 /**
670 * Returns the flow table for specified device.
671 *
672 * @param deviceId identifier of the device
673 * @return Map representing Flow Table of given device.
674 */
Madan Jampani5c3766c2015-06-02 15:54:41 -0700675 private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
676 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
Madan Jampani86940d92015-05-06 11:47:57 -0700677 }
678
679 private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
680 return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
681 }
682
683 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
684 Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
685 return flowEntries.stream()
686 .filter(entry -> Objects.equal(entry, rule))
687 .findAny()
688 .orElse(null);
689 }
690
691 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
692 Set<FlowEntry> result = Sets.newHashSet();
693 getFlowTable(deviceId).values().forEach(result::addAll);
694 return result;
695 }
696
697 public StoredFlowEntry getFlowEntry(FlowRule rule) {
698 return getFlowEntryInternal(rule);
699 }
700
701 public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
702 return getFlowEntriesInternal(deviceId);
703 }
704
705 public void add(FlowEntry rule) {
706 getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
707 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
708 }
709
710 public boolean remove(DeviceId deviceId, FlowEntry rule) {
711 try {
712 return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
713 } finally {
714 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
715 }
716 }
717
718 private NodeId getBackupNode(DeviceId deviceId) {
719 List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
720 // pick the standby which is most likely to become next master
721 return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
722 }
723
724 private void backup() {
Madan Jampani08bf17b2015-05-06 16:25:26 -0700725 if (!backupEnabled) {
726 return;
727 }
Madan Jampani86940d92015-05-06 11:47:57 -0700728 try {
729 // determine the set of devices that we need to backup during this run.
730 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
731 .stream()
732 .filter(deviceId -> {
733 Long lastBackupTime = lastBackupTimes.get(deviceId);
734 Long lastUpdateTime = lastUpdateTimes.get(deviceId);
735 NodeId lastBackupNode = lastBackupNodes.get(deviceId);
Madan Jampani7267c552015-05-20 22:39:17 -0700736 NodeId newBackupNode = getBackupNode(deviceId);
Madan Jampani86940d92015-05-06 11:47:57 -0700737 return lastBackupTime == null
Madan Jampani7267c552015-05-20 22:39:17 -0700738 || !Objects.equal(lastBackupNode, newBackupNode)
Madan Jampani86940d92015-05-06 11:47:57 -0700739 || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
740 })
741 .collect(Collectors.toSet());
742
743 // compute a mapping from node to the set of devices whose flow entries it should backup
744 Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
745 devicesToBackup.forEach(deviceId -> {
746 NodeId backupLocation = getBackupNode(deviceId);
747 if (backupLocation != null) {
748 devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
749 .add(deviceId);
750 }
751 });
Madan Jampani86940d92015-05-06 11:47:57 -0700752 // send the device flow entries to their respective backup nodes
Madan Jampanif7536ab2015-05-07 23:23:23 -0700753 devicesToBackupByNode.forEach(this::backupFlowEntries);
Madan Jampani86940d92015-05-06 11:47:57 -0700754 } catch (Exception e) {
755 log.error("Backup failed.", e);
756 }
757 }
758
Madan Jampani654b58a2015-05-22 11:28:11 -0700759 private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
Madan Jampani7267c552015-05-20 22:39:17 -0700760 log.debug("Received flowEntries for {} to backup", flowTables.keySet());
Madan Jampani654b58a2015-05-22 11:28:11 -0700761 Set<DeviceId> backedupDevices = Sets.newHashSet();
762 try {
Madan Jampania98bf932015-06-02 12:01:36 -0700763 flowTables.forEach((deviceId, deviceFlowTable) -> {
764 // Only process those devices are that not managed by the local node.
765 if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) {
766 Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId);
767 backupFlowTable.clear();
768 backupFlowTable.putAll(deviceFlowTable);
769 backedupDevices.add(deviceId);
770 }
Madan Jampani08bf17b2015-05-06 16:25:26 -0700771 });
Madan Jampani654b58a2015-05-22 11:28:11 -0700772 } catch (Exception e) {
773 log.warn("Failure processing backup request", e);
774 }
775 return backedupDevices;
Madan Jampani86940d92015-05-06 11:47:57 -0700776 }
777 }
778}