blob: 27ca7f792232244a9e3c0ccf79b961e93f874d40 [file] [log] [blame]
Yoonseon Hane026a7f2017-08-23 06:05:25 +09001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.base.MoreObjects;
20import com.google.common.collect.ComparisonChain;
21import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Iterables;
23import com.google.common.collect.Maps;
24import com.google.common.util.concurrent.Futures;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090025import org.onlab.util.KryoNamespace;
26import org.onlab.util.Tools;
27import org.onosproject.cfg.ComponentConfigService;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.CoreService;
31import org.onosproject.core.IdGenerator;
32import org.onosproject.incubator.net.virtual.NetworkId;
33import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
34import org.onosproject.incubator.net.virtual.VirtualNetworkService;
35import org.onosproject.incubator.store.virtual.impl.primitives.VirtualDeviceId;
36import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowEntry;
37import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRule;
38import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRuleBatchEvent;
39import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRuleBatchOperation;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.flow.BatchOperationEntry;
44import org.onosproject.net.flow.CompletedBatchOperation;
45import org.onosproject.net.flow.DefaultFlowEntry;
46import org.onosproject.net.flow.FlowEntry;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090049import org.onosproject.net.flow.FlowRuleEvent;
50import org.onosproject.net.flow.FlowRuleService;
51import org.onosproject.net.flow.FlowRuleStoreDelegate;
52import org.onosproject.net.flow.StoredFlowEntry;
53import org.onosproject.net.flow.TableStatisticsEntry;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070054import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
55import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
56import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
57import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090058import org.onosproject.store.Timestamp;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62import org.onosproject.store.cluster.messaging.MessageSubject;
63import org.onosproject.store.serializers.KryoNamespaces;
64import org.onosproject.store.service.EventuallyConsistentMap;
65import org.onosproject.store.service.EventuallyConsistentMapEvent;
66import org.onosproject.store.service.EventuallyConsistentMapListener;
67import org.onosproject.store.service.Serializer;
68import org.onosproject.store.service.StorageService;
69import org.onosproject.store.service.WallClockTimestamp;
70import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071import org.osgi.service.component.annotations.Activate;
72import org.osgi.service.component.annotations.Component;
73import org.osgi.service.component.annotations.Deactivate;
74import org.osgi.service.component.annotations.Modified;
75import org.osgi.service.component.annotations.Reference;
76import org.osgi.service.component.annotations.ReferenceCardinality;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090077import org.slf4j.Logger;
78
79import java.util.Collections;
80import java.util.Dictionary;
81import java.util.HashSet;
82import java.util.List;
83import java.util.Map;
84import java.util.Objects;
85import java.util.Set;
86import java.util.concurrent.ExecutorService;
87import java.util.concurrent.Executors;
88import java.util.concurrent.TimeUnit;
89import java.util.concurrent.atomic.AtomicInteger;
90import java.util.concurrent.atomic.AtomicReference;
91import java.util.stream.Collectors;
92
93import static com.google.common.base.Preconditions.checkArgument;
94import static com.google.common.base.Strings.isNullOrEmpty;
95import static org.onlab.util.Tools.get;
96import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -070097import static org.onosproject.incubator.store.virtual.impl.OsgiPropertyDefaults.*;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090098import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
99import static org.slf4j.LoggerFactory.getLogger;
100
101/**
102 * Manages inventory of flow rules using a distributed state management protocol
103 * for virtual networks.
104 */
105//TODO: support backup and persistent mechanism
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700106@Component(immediate = true, enabled = false, service = VirtualNetworkFlowRuleStore.class,
107 property = {
108 "messageHandlerThreadPoolSize:Integer=" + MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT,
109 "pendingFutureTimeoutMinutes:Integer=" + BACKUP_PERIOD_MILLIS_DEFAULT,
110 "persistenceEnabled:Boolean=" + PERSISTENCE_ENABLED_DEFAULT,
111 })
112
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900113public class DistributedVirtualFlowRuleStore
114 extends AbstractVirtualStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
115 implements VirtualNetworkFlowRuleStore {
116
117 private final Logger log = getLogger(getClass());
118
119 //TODO: confirm this working fine with multiple thread more than 1
120 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900121 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
122
123 private static final String FLOW_OP_TOPIC = "virtual-flow-ops-ids";
124
125 // MessageSubjects used by DistributedVirtualFlowRuleStore peer-peer communication.
126 private static final MessageSubject APPLY_BATCH_FLOWS
127 = new MessageSubject("virtual-peer-forward-apply-batch");
128 private static final MessageSubject GET_FLOW_ENTRY
129 = new MessageSubject("virtual-peer-forward-get-flow-entry");
130 private static final MessageSubject GET_DEVICE_FLOW_ENTRIES
131 = new MessageSubject("virtual-peer-forward-get-device-flow-entries");
132 private static final MessageSubject REMOVE_FLOW_ENTRY
133 = new MessageSubject("virtual-peer-forward-remove-flow-entry");
134 private static final MessageSubject REMOTE_APPLY_COMPLETED
135 = new MessageSubject("virtual-peer-apply-completed");
136
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700137 //@Property(name = "msgHandlerThreadPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700138 // label = "Number of threads in the message handler pool")
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700139 private int msgHandlerThreadPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900140
Ray Milkeyb5646e62018-10-16 11:42:18 -0700141 //@Property(name = "backupPeriod", intValue = BACKUP_PERIOD_MILLIS,
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700142 // label = "Delay in ms between successive backup runs")
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700143 private int backupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700144 //@Property(name = "persistenceEnabled", boolValue = false,
145 // label = "Indicates whether or not changes in the flow table should be persisted to disk.")
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700146 private boolean persistenceEnabled = PERSISTENCE_ENABLED_DEFAULT;
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900147
148 private InternalFlowTable flowTable = new InternalFlowTable();
149
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700150 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900151 protected CoreService coreService;
152
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700153 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900154 protected ClusterService clusterService;
155
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700156 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900157 protected ClusterCommunicationService clusterCommunicator;
158
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900160 protected ComponentConfigService configService;
161
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900163 protected StorageService storageService;
164
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900166 protected VirtualNetworkService vnaService;
167
168 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
169 private ExecutorService messageHandlingExecutor;
170 private ExecutorService eventHandler;
171
172 private EventuallyConsistentMap<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>> deviceTableStats;
173 private final EventuallyConsistentMapListener<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>>
174 tableStatsListener = new InternalTableStatsListener();
175
176
177 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
178 .register(KryoNamespaces.API)
179 .register(NetworkId.class)
180 .register(VirtualFlowRuleBatchOperation.class)
181 .register(VirtualFlowRuleBatchEvent.class)
182 .build());
183
184 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
185 .register(KryoNamespaces.API)
186 .register(MastershipBasedTimestamp.class);
187
188 private IdGenerator idGenerator;
189 private NodeId local;
190
191
192 @Activate
193 public void activate(ComponentContext context) {
194 configService.registerProperties(getClass());
195
196 idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
197
198 local = clusterService.getLocalNode().id();
199
200 eventHandler = Executors.newSingleThreadExecutor(
201 groupedThreads("onos/virtual-flow", "event-handler", log));
202 messageHandlingExecutor = Executors.newFixedThreadPool(
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700203 msgHandlerThreadPoolSize, groupedThreads("onos/store/virtual-flow", "message-handlers", log));
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900204
205 registerMessageHandlers(messageHandlingExecutor);
206
207 deviceTableStats = storageService
208 .<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>>eventuallyConsistentMapBuilder()
209 .withName("onos-virtual-flow-table-stats")
210 .withSerializer(serializerBuilder)
211 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
212 .withTimestampProvider((k, v) -> new WallClockTimestamp())
213 .withTombstonesDisabled()
214 .build();
215 deviceTableStats.addListener(tableStatsListener);
216
217 logConfig("Started");
218 }
219
220 @Deactivate
221 public void deactivate(ComponentContext context) {
222 configService.unregisterProperties(getClass(), false);
223 unregisterMessageHandlers();
224 deviceTableStats.removeListener(tableStatsListener);
225 deviceTableStats.destroy();
226 eventHandler.shutdownNow();
227 messageHandlingExecutor.shutdownNow();
228 log.info("Stopped");
229 }
230
231 @SuppressWarnings("rawtypes")
232 @Modified
233 public void modified(ComponentContext context) {
234 if (context == null) {
235 logConfig("Default config");
236 return;
237 }
238
239 Dictionary properties = context.getProperties();
240 int newPoolSize;
241 int newBackupPeriod;
242 try {
243 String s = get(properties, "msgHandlerPoolSize");
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700244 newPoolSize = isNullOrEmpty(s) ? msgHandlerThreadPoolSize : Integer.parseInt(s.trim());
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900245
246 s = get(properties, "backupPeriod");
247 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
248 } catch (NumberFormatException | ClassCastException e) {
249 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700250 newBackupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900251 }
252
253 boolean restartBackupTask = false;
254
255 if (newBackupPeriod != backupPeriod) {
256 backupPeriod = newBackupPeriod;
257 restartBackupTask = true;
258 }
259 if (restartBackupTask) {
260 log.warn("Currently, backup tasks are not supported.");
261 }
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700262 if (newPoolSize != msgHandlerThreadPoolSize) {
263 msgHandlerThreadPoolSize = newPoolSize;
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900264 ExecutorService oldMsgHandler = messageHandlingExecutor;
265 messageHandlingExecutor = Executors.newFixedThreadPool(
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700266 msgHandlerThreadPoolSize, groupedThreads("onos/store/virtual-flow", "message-handlers", log));
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900267
268 // replace previously registered handlers.
269 registerMessageHandlers(messageHandlingExecutor);
270 oldMsgHandler.shutdown();
271 }
272
273 logConfig("Reconfigured");
274 }
275
276 @Override
277 public int getFlowRuleCount(NetworkId networkId) {
278 AtomicInteger sum = new AtomicInteger(0);
279 DeviceService deviceService = vnaService.get(networkId, DeviceService.class);
280 deviceService.getDevices()
281 .forEach(device -> sum.addAndGet(
282 Iterables.size(getFlowEntries(networkId, device.id()))));
283 return sum.get();
284 }
285
286 @Override
287 public FlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
288 MastershipService mastershipService =
289 vnaService.get(networkId, MastershipService.class);
290 NodeId master = mastershipService.getMasterFor(rule.deviceId());
291
292 if (master == null) {
293 log.debug("Failed to getFlowEntry: No master for {}, vnet {}",
294 rule.deviceId(), networkId);
295 return null;
296 }
297
298 if (Objects.equals(local, master)) {
299 return flowTable.getFlowEntry(networkId, rule);
300 }
301
302 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) " +
303 "for device {}, vnet {}",
304 master, rule.deviceId(), networkId);
305
306 VirtualFlowRule vRule = new VirtualFlowRule(networkId, rule);
307
308 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(vRule,
309 GET_FLOW_ENTRY,
310 serializer::encode,
311 serializer::decode,
312 master),
313 FLOW_RULE_STORE_TIMEOUT_MILLIS,
314 TimeUnit.MILLISECONDS,
315 null);
316 }
317
318 @Override
319 public Iterable<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
320 MastershipService mastershipService =
321 vnaService.get(networkId, MastershipService.class);
322 NodeId master = mastershipService.getMasterFor(deviceId);
323
324 if (master == null) {
325 log.debug("Failed to getFlowEntries: No master for {}, vnet {}", deviceId, networkId);
326 return Collections.emptyList();
327 }
328
329 if (Objects.equals(local, master)) {
330 return flowTable.getFlowEntries(networkId, deviceId);
331 }
332
333 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
334 master, deviceId);
335
336 return Tools.futureGetOrElse(
337 clusterCommunicator.sendAndReceive(deviceId,
338 GET_DEVICE_FLOW_ENTRIES,
339 serializer::encode,
340 serializer::decode,
341 master),
342 FLOW_RULE_STORE_TIMEOUT_MILLIS,
343 TimeUnit.MILLISECONDS,
344 Collections.emptyList());
345 }
346
347 @Override
348 public void storeBatch(NetworkId networkId, FlowRuleBatchOperation operation) {
349 if (operation.getOperations().isEmpty()) {
350 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
351 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
352 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
353 return;
354 }
355
356 DeviceId deviceId = operation.deviceId();
357 MastershipService mastershipService =
358 vnaService.get(networkId, MastershipService.class);
359 NodeId master = mastershipService.getMasterFor(deviceId);
360
361 if (master == null) {
362 log.warn("No master for {}, vnet {} : flows will be marked for removal", deviceId, networkId);
363
364 updateStoreInternal(networkId, operation);
365
366 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
367 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
368 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
369 return;
370 }
371
372 if (Objects.equals(local, master)) {
373 storeBatchInternal(networkId, operation);
374 return;
375 }
376
377 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}, vent {}",
378 master, deviceId, networkId);
379
380 clusterCommunicator.unicast(new VirtualFlowRuleBatchOperation(networkId, operation),
381 APPLY_BATCH_FLOWS,
382 serializer::encode,
383 master)
384 .whenComplete((result, error) -> {
385 if (error != null) {
386 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
387
388 Set<FlowRule> allFailures = operation.getOperations()
389 .stream()
390 .map(BatchOperationEntry::target)
391 .collect(Collectors.toSet());
392
393 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
394 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
395 new CompletedBatchOperation(false, allFailures, deviceId)));
396 }
397 });
398 }
399
400 @Override
401 public void batchOperationComplete(NetworkId networkId, FlowRuleBatchEvent event) {
402 //FIXME: need a per device pending response
403 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
404 if (nodeId == null) {
405 notifyDelegate(networkId, event);
406 } else {
407 // TODO check unicast return value
408 clusterCommunicator.unicast(new VirtualFlowRuleBatchEvent(networkId, event),
409 REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
410 //error log: log.warn("Failed to respond to peer for batch operation result");
411 }
412 }
413
414 @Override
415 public void deleteFlowRule(NetworkId networkId, FlowRule rule) {
416 storeBatch(networkId,
417 new FlowRuleBatchOperation(
418 Collections.singletonList(
419 new FlowRuleBatchEntry(
420 FlowRuleBatchEntry.FlowRuleOperation.REMOVE,
421 rule)), rule.deviceId(), idGenerator.getNewId()));
422 }
423
424 @Override
425 public FlowRuleEvent addOrUpdateFlowRule(NetworkId networkId, FlowEntry rule) {
426 MastershipService mastershipService =
427 vnaService.get(networkId, MastershipService.class);
428 NodeId master = mastershipService.getMasterFor(rule.deviceId());
429 if (Objects.equals(local, master)) {
430 return addOrUpdateFlowRuleInternal(networkId, rule);
431 }
432
433 log.warn("Tried to update FlowRule {} state,"
434 + " while the Node was not the master.", rule);
435 return null;
436 }
437
438 private FlowRuleEvent addOrUpdateFlowRuleInternal(NetworkId networkId, FlowEntry rule) {
439 // check if this new rule is an update to an existing entry
440 StoredFlowEntry stored = flowTable.getFlowEntry(networkId, rule);
441 if (stored != null) {
442 //FIXME modification of "stored" flow entry outside of flow table
443 stored.setBytes(rule.bytes());
444 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
445 stored.setLiveType(rule.liveType());
446 stored.setPackets(rule.packets());
447 stored.setLastSeen();
448 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
449 stored.setState(FlowEntry.FlowEntryState.ADDED);
450 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, rule);
451 }
452 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
453 }
454
455 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
456 // TODO: also update backup if the behavior is correct.
457 flowTable.add(networkId, rule);
458 return null;
459 }
460
461 @Override
462 public FlowRuleEvent removeFlowRule(NetworkId networkId, FlowEntry rule) {
463 final DeviceId deviceId = rule.deviceId();
464
465 MastershipService mastershipService =
466 vnaService.get(networkId, MastershipService.class);
467 NodeId master = mastershipService.getMasterFor(deviceId);
468
469 if (Objects.equals(local, master)) {
470 // bypass and handle it locally
471 return removeFlowRuleInternal(new VirtualFlowEntry(networkId, rule));
472 }
473
474 if (master == null) {
475 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
476 // TODO: revisit if this should be null (="no-op") or Exception
477 return null;
478 }
479
480 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
481 master, deviceId);
482
483 return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
484 new VirtualFlowEntry(networkId, rule),
485 REMOVE_FLOW_ENTRY,
486 serializer::encode,
487 serializer::decode,
488 master));
489 }
490
491 @Override
492 public FlowRuleEvent pendingFlowRule(NetworkId networkId, FlowEntry rule) {
493 MastershipService mastershipService =
494 vnaService.get(networkId, MastershipService.class);
495 if (mastershipService.isLocalMaster(rule.deviceId())) {
496 StoredFlowEntry stored = flowTable.getFlowEntry(networkId, rule);
497 if (stored != null &&
498 stored.state() != FlowEntry.FlowEntryState.PENDING_ADD) {
499 stored.setState(FlowEntry.FlowEntryState.PENDING_ADD);
500 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
501 }
502 }
503 return null;
504 }
505
506 @Override
507 public void purgeFlowRules(NetworkId networkId) {
508 flowTable.purgeFlowRules(networkId);
509 }
510
511 @Override
512 public FlowRuleEvent updateTableStatistics(NetworkId networkId,
513 DeviceId deviceId,
514 List<TableStatisticsEntry> tableStats) {
515 if (deviceTableStats.get(networkId) == null) {
516 deviceTableStats.put(networkId, Maps.newConcurrentMap());
517 }
518 deviceTableStats.get(networkId).put(deviceId, tableStats);
519 return null;
520 }
521
522 @Override
523 public Iterable<TableStatisticsEntry> getTableStatistics(NetworkId networkId, DeviceId deviceId) {
524 MastershipService mastershipService =
525 vnaService.get(networkId, MastershipService.class);
526 NodeId master = mastershipService.getMasterFor(deviceId);
527
528 if (master == null) {
529 log.debug("Failed to getTableStats: No master for {}", deviceId);
530 return Collections.emptyList();
531 }
532
533 if (deviceTableStats.get(networkId) == null) {
534 deviceTableStats.put(networkId, Maps.newConcurrentMap());
535 }
536
537 List<TableStatisticsEntry> tableStats = deviceTableStats.get(networkId).get(deviceId);
538 if (tableStats == null) {
539 return Collections.emptyList();
540 }
541 return ImmutableList.copyOf(tableStats);
542 }
543
544 private void registerMessageHandlers(ExecutorService executor) {
545 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
546 clusterCommunicator.<VirtualFlowRuleBatchEvent>addSubscriber(
547 REMOTE_APPLY_COMPLETED, serializer::decode,
548 this::notifyDelicateByNetwork, executor);
549 clusterCommunicator.addSubscriber(
550 GET_FLOW_ENTRY, serializer::decode, this::getFlowEntryByNetwork,
551 serializer::encode, executor);
552 clusterCommunicator.addSubscriber(
553 GET_DEVICE_FLOW_ENTRIES, serializer::decode,
554 this::getFlowEntriesByNetwork,
555 serializer::encode, executor);
556 clusterCommunicator.addSubscriber(
557 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal,
558 serializer::encode, executor);
559 }
560
561 private void unregisterMessageHandlers() {
562 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
563 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
564 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
565 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
566 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
567 }
568
569
570 private void logConfig(String prefix) {
571 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}",
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700572 prefix, msgHandlerThreadPoolSize, backupPeriod);
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900573 }
574
575 private void storeBatchInternal(NetworkId networkId, FlowRuleBatchOperation operation) {
576
577 final DeviceId did = operation.deviceId();
578 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
579 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(networkId, operation);
580 if (currentOps.isEmpty()) {
581 batchOperationComplete(networkId, FlowRuleBatchEvent.completed(
582 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
583 new CompletedBatchOperation(true, Collections.emptySet(), did)));
584 return;
585 }
586
587 //Confirm that flowrule service is created
588 vnaService.get(networkId, FlowRuleService.class);
589
590 notifyDelegate(networkId, FlowRuleBatchEvent.requested(new
591 FlowRuleBatchRequest(operation.id(),
592 currentOps), operation.deviceId()));
593 }
594
595 private Set<FlowRuleBatchEntry> updateStoreInternal(NetworkId networkId,
596 FlowRuleBatchOperation operation) {
597 return operation.getOperations().stream().map(
598 op -> {
599 StoredFlowEntry entry;
600 switch (op.operator()) {
601 case ADD:
602 entry = new DefaultFlowEntry(op.target());
603 // always add requested FlowRule
604 // Note: 2 equal FlowEntry may have different treatment
605 flowTable.remove(networkId, entry.deviceId(), entry);
606 flowTable.add(networkId, entry);
607
608 return op;
609 case REMOVE:
610 entry = flowTable.getFlowEntry(networkId, op.target());
611 if (entry != null) {
612 //FIXME modification of "stored" flow entry outside of flow table
613 entry.setState(FlowEntry.FlowEntryState.PENDING_REMOVE);
614 log.debug("Setting state of rule to pending remove: {}", entry);
615 return op;
616 }
617 break;
618 case MODIFY:
619 //TODO: figure this out at some point
620 break;
621 default:
622 log.warn("Unknown flow operation operator: {}", op.operator());
623 }
624 return null;
625 }
626 ).filter(Objects::nonNull).collect(Collectors.toSet());
627 }
628
629 private FlowRuleEvent removeFlowRuleInternal(VirtualFlowEntry rule) {
630 final DeviceId deviceId = rule.flowEntry().deviceId();
631 // This is where one could mark a rule as removed and still keep it in the store.
632 final FlowEntry removed = flowTable.remove(rule.networkId(), deviceId, rule.flowEntry());
633 // rule may be partial rule that is missing treatment, we should use rule from store instead
634 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
635 }
636
637 private final class OnStoreBatch implements ClusterMessageHandler {
638
639 @Override
640 public void handle(final ClusterMessage message) {
641 VirtualFlowRuleBatchOperation vOperation = serializer.decode(message.payload());
642 log.debug("received batch request {}", vOperation);
643
644 FlowRuleBatchOperation operation = vOperation.operation();
645
646 final DeviceId deviceId = operation.deviceId();
647 MastershipService mastershipService =
648 vnaService.get(vOperation.networkId(), MastershipService.class);
649 NodeId master = mastershipService.getMasterFor(deviceId);
650 if (!Objects.equals(local, master)) {
651 Set<FlowRule> failures = new HashSet<>(operation.size());
652 for (FlowRuleBatchEntry op : operation.getOperations()) {
653 failures.add(op.target());
654 }
655 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
656 // This node is no longer the master, respond as all failed.
657 // TODO: we might want to wrap response in envelope
658 // to distinguish sw programming failure and hand over
659 // it make sense in the latter case to retry immediately.
660 message.respond(serializer.encode(allFailed));
661 return;
662 }
663
664 pendingResponses.put(operation.id(), message.sender());
665 storeBatchInternal(vOperation.networkId(), operation);
666 }
667 }
668
669 /**
670 * Returns flow rule entry using virtual flow rule.
671 *
672 * @param rule an encapsulated flow rule to be queried
673 */
674 private FlowEntry getFlowEntryByNetwork(VirtualFlowRule rule) {
675 return flowTable.getFlowEntry(rule.networkId(), rule.rule());
676 }
677
678 /**
679 * returns flow entries using virtual device id.
680 *
681 * @param deviceId an encapsulated virtual device id
682 * @return a set of flow entries
683 */
684 private Set<FlowEntry> getFlowEntriesByNetwork(VirtualDeviceId deviceId) {
685 return flowTable.getFlowEntries(deviceId.networkId(), deviceId.deviceId());
686 }
687
688 /**
689 * span out Flow Rule Batch event according to virtual network id.
690 *
691 * @param event a event to be span out
692 */
693 private void notifyDelicateByNetwork(VirtualFlowRuleBatchEvent event) {
694 batchOperationComplete(event.networkId(), event.event());
695 }
696
697 private class InternalFlowTable {
698 //TODO replace the Map<V,V> with ExtendedSet
699 //TODO: support backup mechanism
700 private final Map<NetworkId, Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>>
701 flowEntriesMap = Maps.newConcurrentMap();
702 private final Map<NetworkId, Map<DeviceId, Long>> lastUpdateTimesMap = Maps.newConcurrentMap();
703
704 private Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
705 getFlowEntriesByNetwork(NetworkId networkId) {
706 return flowEntriesMap.computeIfAbsent(networkId, k -> Maps.newConcurrentMap());
707 }
708
709 private Map<DeviceId, Long> getLastUpdateTimesByNetwork(NetworkId networkId) {
710 return lastUpdateTimesMap.computeIfAbsent(networkId, k -> Maps.newConcurrentMap());
711 }
712
713 /**
714 * Returns the flow table for specified device.
715 *
716 * @param networkId virtual network identifier
717 * @param deviceId identifier of the device
718 * @return Map representing Flow Table of given device.
719 */
720 private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>
721 getFlowTable(NetworkId networkId, DeviceId deviceId) {
722 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
723 flowEntries = getFlowEntriesByNetwork(networkId);
724 if (persistenceEnabled) {
725 //TODO: support persistent
726 log.warn("Persistent is not supported");
727 return null;
728 } else {
729 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
730 }
731 }
732
733 private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>
734 getFlowTableCopy(NetworkId networkId, DeviceId deviceId) {
735
736 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
737 flowEntries = getFlowEntriesByNetwork(networkId);
738 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
739
740 if (persistenceEnabled) {
741 //TODO: support persistent
742 log.warn("Persistent is not supported");
743 return null;
744 } else {
745 flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
746 copy.put(k, Maps.newHashMap(v));
747 });
748 return copy;
749 }
750 }
751
752 private Map<StoredFlowEntry, StoredFlowEntry>
753 getFlowEntriesInternal(NetworkId networkId, DeviceId deviceId, FlowId flowId) {
754
755 return getFlowTable(networkId, deviceId)
756 .computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
757 }
758
759 private StoredFlowEntry getFlowEntryInternal(NetworkId networkId, FlowRule rule) {
760
761 return getFlowEntriesInternal(networkId, rule.deviceId(), rule.id()).get(rule);
762 }
763
764 private Set<FlowEntry> getFlowEntriesInternal(NetworkId networkId, DeviceId deviceId) {
765
766 return getFlowTable(networkId, deviceId).values().stream()
767 .flatMap(m -> m.values().stream())
768 .collect(Collectors.toSet());
769 }
770
771 public StoredFlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
772 return getFlowEntryInternal(networkId, rule);
773 }
774
775 public Set<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
776
777 return getFlowEntriesInternal(networkId, deviceId);
778 }
779
780 public void add(NetworkId networkId, FlowEntry rule) {
781 Map<DeviceId, Long> lastUpdateTimes = getLastUpdateTimesByNetwork(networkId);
782
783 getFlowEntriesInternal(networkId, rule.deviceId(), rule.id())
784 .compute((StoredFlowEntry) rule, (k, stored) -> {
785 //TODO compare stored and rule timestamps
786 //TODO the key is not updated
787 return (StoredFlowEntry) rule;
788 });
789 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
790 }
791
792 public FlowEntry remove(NetworkId networkId, DeviceId deviceId, FlowEntry rule) {
793 final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
794 Map<DeviceId, Long> lastUpdateTimes = getLastUpdateTimesByNetwork(networkId);
795
796 getFlowEntriesInternal(networkId, rule.deviceId(), rule.id())
797 .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
798 if (rule instanceof DefaultFlowEntry) {
799 DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
800 if (stored instanceof DefaultFlowEntry) {
801 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
802 if (toRemove.created() < storedEntry.created()) {
803 log.debug("Trying to remove more recent flow entry {} (stored: {})",
804 toRemove, stored);
805 // the key is not updated, removedRule remains null
806 return stored;
807 }
808 }
809 }
810 removedRule.set(stored);
811 return null;
812 });
813
814 if (removedRule.get() != null) {
815 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
816 return removedRule.get();
817 } else {
818 return null;
819 }
820 }
821
822 public void purgeFlowRule(NetworkId networkId, DeviceId deviceId) {
823 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
824 flowEntries = getFlowEntriesByNetwork(networkId);
825 flowEntries.remove(deviceId);
826 }
827
828 public void purgeFlowRules(NetworkId networkId) {
829 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
830 flowEntries = getFlowEntriesByNetwork(networkId);
831 flowEntries.clear();
832 }
833 }
834
835 private class InternalTableStatsListener
836 implements EventuallyConsistentMapListener<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>> {
837
838 @Override
839 public void event(EventuallyConsistentMapEvent<NetworkId, Map<DeviceId,
840 List<TableStatisticsEntry>>> event) {
841 //TODO: Generate an event to listeners (do we need?)
842 }
843 }
844
845 public final class MastershipBasedTimestamp implements Timestamp {
846
847 private final long termNumber;
848 private final long sequenceNumber;
849
850 /**
851 * Default constructor for serialization.
852 */
853 protected MastershipBasedTimestamp() {
854 this.termNumber = -1;
855 this.sequenceNumber = -1;
856 }
857
858 /**
859 * Default version tuple.
860 *
861 * @param termNumber the mastership termNumber
862 * @param sequenceNumber the sequenceNumber number within the termNumber
863 */
864 public MastershipBasedTimestamp(long termNumber, long sequenceNumber) {
865 this.termNumber = termNumber;
866 this.sequenceNumber = sequenceNumber;
867 }
868
869 @Override
870 public int compareTo(Timestamp o) {
871 checkArgument(o instanceof MastershipBasedTimestamp,
872 "Must be MastershipBasedTimestamp", o);
873 MastershipBasedTimestamp that = (MastershipBasedTimestamp) o;
874
875 return ComparisonChain.start()
876 .compare(this.termNumber, that.termNumber)
877 .compare(this.sequenceNumber, that.sequenceNumber)
878 .result();
879 }
880
881 @Override
882 public int hashCode() {
883 return Objects.hash(termNumber, sequenceNumber);
884 }
885
886 @Override
887 public boolean equals(Object obj) {
888 if (this == obj) {
889 return true;
890 }
891 if (!(obj instanceof MastershipBasedTimestamp)) {
892 return false;
893 }
894 MastershipBasedTimestamp that = (MastershipBasedTimestamp) obj;
895 return Objects.equals(this.termNumber, that.termNumber) &&
896 Objects.equals(this.sequenceNumber, that.sequenceNumber);
897 }
898
899 @Override
900 public String toString() {
901 return MoreObjects.toStringHelper(getClass())
902 .add("termNumber", termNumber)
903 .add("sequenceNumber", sequenceNumber)
904 .toString();
905 }
906
907 /**
908 * Returns the termNumber.
909 *
910 * @return termNumber
911 */
912 public long termNumber() {
913 return termNumber;
914 }
915
916 /**
917 * Returns the sequenceNumber number.
918 *
919 * @return sequenceNumber
920 */
921 public long sequenceNumber() {
922 return sequenceNumber;
923 }
924 }
925}