blob: e7533ca1457ffa566178739c91579038f0c6e7c4 [file] [log] [blame]
Yoonseon Hane026a7f2017-08-23 06:05:25 +09001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.base.MoreObjects;
20import com.google.common.collect.ComparisonChain;
21import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Iterables;
23import com.google.common.collect.Maps;
24import com.google.common.util.concurrent.Futures;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090025import org.onlab.util.KryoNamespace;
26import org.onlab.util.Tools;
27import org.onosproject.cfg.ComponentConfigService;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.CoreService;
31import org.onosproject.core.IdGenerator;
32import org.onosproject.incubator.net.virtual.NetworkId;
33import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
34import org.onosproject.incubator.net.virtual.VirtualNetworkService;
35import org.onosproject.incubator.store.virtual.impl.primitives.VirtualDeviceId;
36import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowEntry;
37import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRule;
38import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRuleBatchEvent;
39import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRuleBatchOperation;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.flow.BatchOperationEntry;
44import org.onosproject.net.flow.CompletedBatchOperation;
45import org.onosproject.net.flow.DefaultFlowEntry;
46import org.onosproject.net.flow.FlowEntry;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090049import org.onosproject.net.flow.FlowRuleEvent;
50import org.onosproject.net.flow.FlowRuleService;
51import org.onosproject.net.flow.FlowRuleStoreDelegate;
52import org.onosproject.net.flow.StoredFlowEntry;
53import org.onosproject.net.flow.TableStatisticsEntry;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070054import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
55import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
56import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
57import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090058import org.onosproject.store.Timestamp;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62import org.onosproject.store.cluster.messaging.MessageSubject;
63import org.onosproject.store.serializers.KryoNamespaces;
64import org.onosproject.store.service.EventuallyConsistentMap;
65import org.onosproject.store.service.EventuallyConsistentMapEvent;
66import org.onosproject.store.service.EventuallyConsistentMapListener;
67import org.onosproject.store.service.Serializer;
68import org.onosproject.store.service.StorageService;
69import org.onosproject.store.service.WallClockTimestamp;
70import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071import org.osgi.service.component.annotations.Activate;
72import org.osgi.service.component.annotations.Component;
73import org.osgi.service.component.annotations.Deactivate;
74import org.osgi.service.component.annotations.Modified;
75import org.osgi.service.component.annotations.Reference;
76import org.osgi.service.component.annotations.ReferenceCardinality;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090077import org.slf4j.Logger;
78
79import java.util.Collections;
80import java.util.Dictionary;
81import java.util.HashSet;
82import java.util.List;
83import java.util.Map;
84import java.util.Objects;
85import java.util.Set;
86import java.util.concurrent.ExecutorService;
87import java.util.concurrent.Executors;
88import java.util.concurrent.TimeUnit;
89import java.util.concurrent.atomic.AtomicInteger;
90import java.util.concurrent.atomic.AtomicReference;
91import java.util.stream.Collectors;
92
93import static com.google.common.base.Preconditions.checkArgument;
94import static com.google.common.base.Strings.isNullOrEmpty;
95import static org.onlab.util.Tools.get;
96import static org.onlab.util.Tools.groupedThreads;
97import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
98import static org.slf4j.LoggerFactory.getLogger;
99
100/**
101 * Manages inventory of flow rules using a distributed state management protocol
102 * for virtual networks.
103 */
104//TODO: support backup and persistent mechanism
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700105@Component(immediate = true, enabled = false, service = VirtualNetworkFlowRuleStore.class)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900106public class DistributedVirtualFlowRuleStore
107 extends AbstractVirtualStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
108 implements VirtualNetworkFlowRuleStore {
109
110 private final Logger log = getLogger(getClass());
111
112 //TODO: confirm this working fine with multiple thread more than 1
113 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
114 private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
115 private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
116 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
117
118 private static final String FLOW_OP_TOPIC = "virtual-flow-ops-ids";
119
120 // MessageSubjects used by DistributedVirtualFlowRuleStore peer-peer communication.
121 private static final MessageSubject APPLY_BATCH_FLOWS
122 = new MessageSubject("virtual-peer-forward-apply-batch");
123 private static final MessageSubject GET_FLOW_ENTRY
124 = new MessageSubject("virtual-peer-forward-get-flow-entry");
125 private static final MessageSubject GET_DEVICE_FLOW_ENTRIES
126 = new MessageSubject("virtual-peer-forward-get-device-flow-entries");
127 private static final MessageSubject REMOVE_FLOW_ENTRY
128 = new MessageSubject("virtual-peer-forward-remove-flow-entry");
129 private static final MessageSubject REMOTE_APPLY_COMPLETED
130 = new MessageSubject("virtual-peer-apply-completed");
131
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700132 //@Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
133 // label = "Number of threads in the message handler pool")
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900134 private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
135
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700136 //@Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
137 // label = "Delay in ms between successive backup runs")
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900138 private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700139 //@Property(name = "persistenceEnabled", boolValue = false,
140 // label = "Indicates whether or not changes in the flow table should be persisted to disk.")
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900141 private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
142
143 private InternalFlowTable flowTable = new InternalFlowTable();
144
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900146 protected CoreService coreService;
147
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700148 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900149 protected ClusterService clusterService;
150
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700151 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900152 protected ClusterCommunicationService clusterCommunicator;
153
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700154 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900155 protected ComponentConfigService configService;
156
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700157 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900158 protected StorageService storageService;
159
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700160 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900161 protected VirtualNetworkService vnaService;
162
163 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
164 private ExecutorService messageHandlingExecutor;
165 private ExecutorService eventHandler;
166
167 private EventuallyConsistentMap<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>> deviceTableStats;
168 private final EventuallyConsistentMapListener<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>>
169 tableStatsListener = new InternalTableStatsListener();
170
171
172 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
173 .register(KryoNamespaces.API)
174 .register(NetworkId.class)
175 .register(VirtualFlowRuleBatchOperation.class)
176 .register(VirtualFlowRuleBatchEvent.class)
177 .build());
178
179 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
180 .register(KryoNamespaces.API)
181 .register(MastershipBasedTimestamp.class);
182
183 private IdGenerator idGenerator;
184 private NodeId local;
185
186
187 @Activate
188 public void activate(ComponentContext context) {
189 configService.registerProperties(getClass());
190
191 idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
192
193 local = clusterService.getLocalNode().id();
194
195 eventHandler = Executors.newSingleThreadExecutor(
196 groupedThreads("onos/virtual-flow", "event-handler", log));
197 messageHandlingExecutor = Executors.newFixedThreadPool(
198 msgHandlerPoolSize, groupedThreads("onos/store/virtual-flow", "message-handlers", log));
199
200 registerMessageHandlers(messageHandlingExecutor);
201
202 deviceTableStats = storageService
203 .<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>>eventuallyConsistentMapBuilder()
204 .withName("onos-virtual-flow-table-stats")
205 .withSerializer(serializerBuilder)
206 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
207 .withTimestampProvider((k, v) -> new WallClockTimestamp())
208 .withTombstonesDisabled()
209 .build();
210 deviceTableStats.addListener(tableStatsListener);
211
212 logConfig("Started");
213 }
214
215 @Deactivate
216 public void deactivate(ComponentContext context) {
217 configService.unregisterProperties(getClass(), false);
218 unregisterMessageHandlers();
219 deviceTableStats.removeListener(tableStatsListener);
220 deviceTableStats.destroy();
221 eventHandler.shutdownNow();
222 messageHandlingExecutor.shutdownNow();
223 log.info("Stopped");
224 }
225
226 @SuppressWarnings("rawtypes")
227 @Modified
228 public void modified(ComponentContext context) {
229 if (context == null) {
230 logConfig("Default config");
231 return;
232 }
233
234 Dictionary properties = context.getProperties();
235 int newPoolSize;
236 int newBackupPeriod;
237 try {
238 String s = get(properties, "msgHandlerPoolSize");
239 newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
240
241 s = get(properties, "backupPeriod");
242 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
243 } catch (NumberFormatException | ClassCastException e) {
244 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
245 newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
246 }
247
248 boolean restartBackupTask = false;
249
250 if (newBackupPeriod != backupPeriod) {
251 backupPeriod = newBackupPeriod;
252 restartBackupTask = true;
253 }
254 if (restartBackupTask) {
255 log.warn("Currently, backup tasks are not supported.");
256 }
257 if (newPoolSize != msgHandlerPoolSize) {
258 msgHandlerPoolSize = newPoolSize;
259 ExecutorService oldMsgHandler = messageHandlingExecutor;
260 messageHandlingExecutor = Executors.newFixedThreadPool(
261 msgHandlerPoolSize, groupedThreads("onos/store/virtual-flow", "message-handlers", log));
262
263 // replace previously registered handlers.
264 registerMessageHandlers(messageHandlingExecutor);
265 oldMsgHandler.shutdown();
266 }
267
268 logConfig("Reconfigured");
269 }
270
271 @Override
272 public int getFlowRuleCount(NetworkId networkId) {
273 AtomicInteger sum = new AtomicInteger(0);
274 DeviceService deviceService = vnaService.get(networkId, DeviceService.class);
275 deviceService.getDevices()
276 .forEach(device -> sum.addAndGet(
277 Iterables.size(getFlowEntries(networkId, device.id()))));
278 return sum.get();
279 }
280
281 @Override
282 public FlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
283 MastershipService mastershipService =
284 vnaService.get(networkId, MastershipService.class);
285 NodeId master = mastershipService.getMasterFor(rule.deviceId());
286
287 if (master == null) {
288 log.debug("Failed to getFlowEntry: No master for {}, vnet {}",
289 rule.deviceId(), networkId);
290 return null;
291 }
292
293 if (Objects.equals(local, master)) {
294 return flowTable.getFlowEntry(networkId, rule);
295 }
296
297 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) " +
298 "for device {}, vnet {}",
299 master, rule.deviceId(), networkId);
300
301 VirtualFlowRule vRule = new VirtualFlowRule(networkId, rule);
302
303 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(vRule,
304 GET_FLOW_ENTRY,
305 serializer::encode,
306 serializer::decode,
307 master),
308 FLOW_RULE_STORE_TIMEOUT_MILLIS,
309 TimeUnit.MILLISECONDS,
310 null);
311 }
312
313 @Override
314 public Iterable<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
315 MastershipService mastershipService =
316 vnaService.get(networkId, MastershipService.class);
317 NodeId master = mastershipService.getMasterFor(deviceId);
318
319 if (master == null) {
320 log.debug("Failed to getFlowEntries: No master for {}, vnet {}", deviceId, networkId);
321 return Collections.emptyList();
322 }
323
324 if (Objects.equals(local, master)) {
325 return flowTable.getFlowEntries(networkId, deviceId);
326 }
327
328 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
329 master, deviceId);
330
331 return Tools.futureGetOrElse(
332 clusterCommunicator.sendAndReceive(deviceId,
333 GET_DEVICE_FLOW_ENTRIES,
334 serializer::encode,
335 serializer::decode,
336 master),
337 FLOW_RULE_STORE_TIMEOUT_MILLIS,
338 TimeUnit.MILLISECONDS,
339 Collections.emptyList());
340 }
341
342 @Override
343 public void storeBatch(NetworkId networkId, FlowRuleBatchOperation operation) {
344 if (operation.getOperations().isEmpty()) {
345 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
346 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
347 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
348 return;
349 }
350
351 DeviceId deviceId = operation.deviceId();
352 MastershipService mastershipService =
353 vnaService.get(networkId, MastershipService.class);
354 NodeId master = mastershipService.getMasterFor(deviceId);
355
356 if (master == null) {
357 log.warn("No master for {}, vnet {} : flows will be marked for removal", deviceId, networkId);
358
359 updateStoreInternal(networkId, operation);
360
361 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
362 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
363 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
364 return;
365 }
366
367 if (Objects.equals(local, master)) {
368 storeBatchInternal(networkId, operation);
369 return;
370 }
371
372 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}, vent {}",
373 master, deviceId, networkId);
374
375 clusterCommunicator.unicast(new VirtualFlowRuleBatchOperation(networkId, operation),
376 APPLY_BATCH_FLOWS,
377 serializer::encode,
378 master)
379 .whenComplete((result, error) -> {
380 if (error != null) {
381 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
382
383 Set<FlowRule> allFailures = operation.getOperations()
384 .stream()
385 .map(BatchOperationEntry::target)
386 .collect(Collectors.toSet());
387
388 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
389 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
390 new CompletedBatchOperation(false, allFailures, deviceId)));
391 }
392 });
393 }
394
395 @Override
396 public void batchOperationComplete(NetworkId networkId, FlowRuleBatchEvent event) {
397 //FIXME: need a per device pending response
398 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
399 if (nodeId == null) {
400 notifyDelegate(networkId, event);
401 } else {
402 // TODO check unicast return value
403 clusterCommunicator.unicast(new VirtualFlowRuleBatchEvent(networkId, event),
404 REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
405 //error log: log.warn("Failed to respond to peer for batch operation result");
406 }
407 }
408
409 @Override
410 public void deleteFlowRule(NetworkId networkId, FlowRule rule) {
411 storeBatch(networkId,
412 new FlowRuleBatchOperation(
413 Collections.singletonList(
414 new FlowRuleBatchEntry(
415 FlowRuleBatchEntry.FlowRuleOperation.REMOVE,
416 rule)), rule.deviceId(), idGenerator.getNewId()));
417 }
418
419 @Override
420 public FlowRuleEvent addOrUpdateFlowRule(NetworkId networkId, FlowEntry rule) {
421 MastershipService mastershipService =
422 vnaService.get(networkId, MastershipService.class);
423 NodeId master = mastershipService.getMasterFor(rule.deviceId());
424 if (Objects.equals(local, master)) {
425 return addOrUpdateFlowRuleInternal(networkId, rule);
426 }
427
428 log.warn("Tried to update FlowRule {} state,"
429 + " while the Node was not the master.", rule);
430 return null;
431 }
432
433 private FlowRuleEvent addOrUpdateFlowRuleInternal(NetworkId networkId, FlowEntry rule) {
434 // check if this new rule is an update to an existing entry
435 StoredFlowEntry stored = flowTable.getFlowEntry(networkId, rule);
436 if (stored != null) {
437 //FIXME modification of "stored" flow entry outside of flow table
438 stored.setBytes(rule.bytes());
439 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
440 stored.setLiveType(rule.liveType());
441 stored.setPackets(rule.packets());
442 stored.setLastSeen();
443 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
444 stored.setState(FlowEntry.FlowEntryState.ADDED);
445 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, rule);
446 }
447 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
448 }
449
450 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
451 // TODO: also update backup if the behavior is correct.
452 flowTable.add(networkId, rule);
453 return null;
454 }
455
456 @Override
457 public FlowRuleEvent removeFlowRule(NetworkId networkId, FlowEntry rule) {
458 final DeviceId deviceId = rule.deviceId();
459
460 MastershipService mastershipService =
461 vnaService.get(networkId, MastershipService.class);
462 NodeId master = mastershipService.getMasterFor(deviceId);
463
464 if (Objects.equals(local, master)) {
465 // bypass and handle it locally
466 return removeFlowRuleInternal(new VirtualFlowEntry(networkId, rule));
467 }
468
469 if (master == null) {
470 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
471 // TODO: revisit if this should be null (="no-op") or Exception
472 return null;
473 }
474
475 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
476 master, deviceId);
477
478 return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
479 new VirtualFlowEntry(networkId, rule),
480 REMOVE_FLOW_ENTRY,
481 serializer::encode,
482 serializer::decode,
483 master));
484 }
485
486 @Override
487 public FlowRuleEvent pendingFlowRule(NetworkId networkId, FlowEntry rule) {
488 MastershipService mastershipService =
489 vnaService.get(networkId, MastershipService.class);
490 if (mastershipService.isLocalMaster(rule.deviceId())) {
491 StoredFlowEntry stored = flowTable.getFlowEntry(networkId, rule);
492 if (stored != null &&
493 stored.state() != FlowEntry.FlowEntryState.PENDING_ADD) {
494 stored.setState(FlowEntry.FlowEntryState.PENDING_ADD);
495 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
496 }
497 }
498 return null;
499 }
500
501 @Override
502 public void purgeFlowRules(NetworkId networkId) {
503 flowTable.purgeFlowRules(networkId);
504 }
505
506 @Override
507 public FlowRuleEvent updateTableStatistics(NetworkId networkId,
508 DeviceId deviceId,
509 List<TableStatisticsEntry> tableStats) {
510 if (deviceTableStats.get(networkId) == null) {
511 deviceTableStats.put(networkId, Maps.newConcurrentMap());
512 }
513 deviceTableStats.get(networkId).put(deviceId, tableStats);
514 return null;
515 }
516
517 @Override
518 public Iterable<TableStatisticsEntry> getTableStatistics(NetworkId networkId, DeviceId deviceId) {
519 MastershipService mastershipService =
520 vnaService.get(networkId, MastershipService.class);
521 NodeId master = mastershipService.getMasterFor(deviceId);
522
523 if (master == null) {
524 log.debug("Failed to getTableStats: No master for {}", deviceId);
525 return Collections.emptyList();
526 }
527
528 if (deviceTableStats.get(networkId) == null) {
529 deviceTableStats.put(networkId, Maps.newConcurrentMap());
530 }
531
532 List<TableStatisticsEntry> tableStats = deviceTableStats.get(networkId).get(deviceId);
533 if (tableStats == null) {
534 return Collections.emptyList();
535 }
536 return ImmutableList.copyOf(tableStats);
537 }
538
539 private void registerMessageHandlers(ExecutorService executor) {
540 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
541 clusterCommunicator.<VirtualFlowRuleBatchEvent>addSubscriber(
542 REMOTE_APPLY_COMPLETED, serializer::decode,
543 this::notifyDelicateByNetwork, executor);
544 clusterCommunicator.addSubscriber(
545 GET_FLOW_ENTRY, serializer::decode, this::getFlowEntryByNetwork,
546 serializer::encode, executor);
547 clusterCommunicator.addSubscriber(
548 GET_DEVICE_FLOW_ENTRIES, serializer::decode,
549 this::getFlowEntriesByNetwork,
550 serializer::encode, executor);
551 clusterCommunicator.addSubscriber(
552 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal,
553 serializer::encode, executor);
554 }
555
556 private void unregisterMessageHandlers() {
557 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
558 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
559 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
560 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
561 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
562 }
563
564
565 private void logConfig(String prefix) {
566 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}",
567 prefix, msgHandlerPoolSize, backupPeriod);
568 }
569
570 private void storeBatchInternal(NetworkId networkId, FlowRuleBatchOperation operation) {
571
572 final DeviceId did = operation.deviceId();
573 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
574 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(networkId, operation);
575 if (currentOps.isEmpty()) {
576 batchOperationComplete(networkId, FlowRuleBatchEvent.completed(
577 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
578 new CompletedBatchOperation(true, Collections.emptySet(), did)));
579 return;
580 }
581
582 //Confirm that flowrule service is created
583 vnaService.get(networkId, FlowRuleService.class);
584
585 notifyDelegate(networkId, FlowRuleBatchEvent.requested(new
586 FlowRuleBatchRequest(operation.id(),
587 currentOps), operation.deviceId()));
588 }
589
590 private Set<FlowRuleBatchEntry> updateStoreInternal(NetworkId networkId,
591 FlowRuleBatchOperation operation) {
592 return operation.getOperations().stream().map(
593 op -> {
594 StoredFlowEntry entry;
595 switch (op.operator()) {
596 case ADD:
597 entry = new DefaultFlowEntry(op.target());
598 // always add requested FlowRule
599 // Note: 2 equal FlowEntry may have different treatment
600 flowTable.remove(networkId, entry.deviceId(), entry);
601 flowTable.add(networkId, entry);
602
603 return op;
604 case REMOVE:
605 entry = flowTable.getFlowEntry(networkId, op.target());
606 if (entry != null) {
607 //FIXME modification of "stored" flow entry outside of flow table
608 entry.setState(FlowEntry.FlowEntryState.PENDING_REMOVE);
609 log.debug("Setting state of rule to pending remove: {}", entry);
610 return op;
611 }
612 break;
613 case MODIFY:
614 //TODO: figure this out at some point
615 break;
616 default:
617 log.warn("Unknown flow operation operator: {}", op.operator());
618 }
619 return null;
620 }
621 ).filter(Objects::nonNull).collect(Collectors.toSet());
622 }
623
624 private FlowRuleEvent removeFlowRuleInternal(VirtualFlowEntry rule) {
625 final DeviceId deviceId = rule.flowEntry().deviceId();
626 // This is where one could mark a rule as removed and still keep it in the store.
627 final FlowEntry removed = flowTable.remove(rule.networkId(), deviceId, rule.flowEntry());
628 // rule may be partial rule that is missing treatment, we should use rule from store instead
629 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
630 }
631
632 private final class OnStoreBatch implements ClusterMessageHandler {
633
634 @Override
635 public void handle(final ClusterMessage message) {
636 VirtualFlowRuleBatchOperation vOperation = serializer.decode(message.payload());
637 log.debug("received batch request {}", vOperation);
638
639 FlowRuleBatchOperation operation = vOperation.operation();
640
641 final DeviceId deviceId = operation.deviceId();
642 MastershipService mastershipService =
643 vnaService.get(vOperation.networkId(), MastershipService.class);
644 NodeId master = mastershipService.getMasterFor(deviceId);
645 if (!Objects.equals(local, master)) {
646 Set<FlowRule> failures = new HashSet<>(operation.size());
647 for (FlowRuleBatchEntry op : operation.getOperations()) {
648 failures.add(op.target());
649 }
650 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
651 // This node is no longer the master, respond as all failed.
652 // TODO: we might want to wrap response in envelope
653 // to distinguish sw programming failure and hand over
654 // it make sense in the latter case to retry immediately.
655 message.respond(serializer.encode(allFailed));
656 return;
657 }
658
659 pendingResponses.put(operation.id(), message.sender());
660 storeBatchInternal(vOperation.networkId(), operation);
661 }
662 }
663
664 /**
665 * Returns flow rule entry using virtual flow rule.
666 *
667 * @param rule an encapsulated flow rule to be queried
668 */
669 private FlowEntry getFlowEntryByNetwork(VirtualFlowRule rule) {
670 return flowTable.getFlowEntry(rule.networkId(), rule.rule());
671 }
672
673 /**
674 * returns flow entries using virtual device id.
675 *
676 * @param deviceId an encapsulated virtual device id
677 * @return a set of flow entries
678 */
679 private Set<FlowEntry> getFlowEntriesByNetwork(VirtualDeviceId deviceId) {
680 return flowTable.getFlowEntries(deviceId.networkId(), deviceId.deviceId());
681 }
682
683 /**
684 * span out Flow Rule Batch event according to virtual network id.
685 *
686 * @param event a event to be span out
687 */
688 private void notifyDelicateByNetwork(VirtualFlowRuleBatchEvent event) {
689 batchOperationComplete(event.networkId(), event.event());
690 }
691
692 private class InternalFlowTable {
693 //TODO replace the Map<V,V> with ExtendedSet
694 //TODO: support backup mechanism
695 private final Map<NetworkId, Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>>
696 flowEntriesMap = Maps.newConcurrentMap();
697 private final Map<NetworkId, Map<DeviceId, Long>> lastUpdateTimesMap = Maps.newConcurrentMap();
698
699 private Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
700 getFlowEntriesByNetwork(NetworkId networkId) {
701 return flowEntriesMap.computeIfAbsent(networkId, k -> Maps.newConcurrentMap());
702 }
703
704 private Map<DeviceId, Long> getLastUpdateTimesByNetwork(NetworkId networkId) {
705 return lastUpdateTimesMap.computeIfAbsent(networkId, k -> Maps.newConcurrentMap());
706 }
707
708 /**
709 * Returns the flow table for specified device.
710 *
711 * @param networkId virtual network identifier
712 * @param deviceId identifier of the device
713 * @return Map representing Flow Table of given device.
714 */
715 private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>
716 getFlowTable(NetworkId networkId, DeviceId deviceId) {
717 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
718 flowEntries = getFlowEntriesByNetwork(networkId);
719 if (persistenceEnabled) {
720 //TODO: support persistent
721 log.warn("Persistent is not supported");
722 return null;
723 } else {
724 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
725 }
726 }
727
728 private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>
729 getFlowTableCopy(NetworkId networkId, DeviceId deviceId) {
730
731 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
732 flowEntries = getFlowEntriesByNetwork(networkId);
733 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
734
735 if (persistenceEnabled) {
736 //TODO: support persistent
737 log.warn("Persistent is not supported");
738 return null;
739 } else {
740 flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
741 copy.put(k, Maps.newHashMap(v));
742 });
743 return copy;
744 }
745 }
746
747 private Map<StoredFlowEntry, StoredFlowEntry>
748 getFlowEntriesInternal(NetworkId networkId, DeviceId deviceId, FlowId flowId) {
749
750 return getFlowTable(networkId, deviceId)
751 .computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
752 }
753
754 private StoredFlowEntry getFlowEntryInternal(NetworkId networkId, FlowRule rule) {
755
756 return getFlowEntriesInternal(networkId, rule.deviceId(), rule.id()).get(rule);
757 }
758
759 private Set<FlowEntry> getFlowEntriesInternal(NetworkId networkId, DeviceId deviceId) {
760
761 return getFlowTable(networkId, deviceId).values().stream()
762 .flatMap(m -> m.values().stream())
763 .collect(Collectors.toSet());
764 }
765
766 public StoredFlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
767 return getFlowEntryInternal(networkId, rule);
768 }
769
770 public Set<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
771
772 return getFlowEntriesInternal(networkId, deviceId);
773 }
774
775 public void add(NetworkId networkId, FlowEntry rule) {
776 Map<DeviceId, Long> lastUpdateTimes = getLastUpdateTimesByNetwork(networkId);
777
778 getFlowEntriesInternal(networkId, rule.deviceId(), rule.id())
779 .compute((StoredFlowEntry) rule, (k, stored) -> {
780 //TODO compare stored and rule timestamps
781 //TODO the key is not updated
782 return (StoredFlowEntry) rule;
783 });
784 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
785 }
786
787 public FlowEntry remove(NetworkId networkId, DeviceId deviceId, FlowEntry rule) {
788 final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
789 Map<DeviceId, Long> lastUpdateTimes = getLastUpdateTimesByNetwork(networkId);
790
791 getFlowEntriesInternal(networkId, rule.deviceId(), rule.id())
792 .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
793 if (rule instanceof DefaultFlowEntry) {
794 DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
795 if (stored instanceof DefaultFlowEntry) {
796 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
797 if (toRemove.created() < storedEntry.created()) {
798 log.debug("Trying to remove more recent flow entry {} (stored: {})",
799 toRemove, stored);
800 // the key is not updated, removedRule remains null
801 return stored;
802 }
803 }
804 }
805 removedRule.set(stored);
806 return null;
807 });
808
809 if (removedRule.get() != null) {
810 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
811 return removedRule.get();
812 } else {
813 return null;
814 }
815 }
816
817 public void purgeFlowRule(NetworkId networkId, DeviceId deviceId) {
818 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
819 flowEntries = getFlowEntriesByNetwork(networkId);
820 flowEntries.remove(deviceId);
821 }
822
823 public void purgeFlowRules(NetworkId networkId) {
824 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
825 flowEntries = getFlowEntriesByNetwork(networkId);
826 flowEntries.clear();
827 }
828 }
829
830 private class InternalTableStatsListener
831 implements EventuallyConsistentMapListener<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>> {
832
833 @Override
834 public void event(EventuallyConsistentMapEvent<NetworkId, Map<DeviceId,
835 List<TableStatisticsEntry>>> event) {
836 //TODO: Generate an event to listeners (do we need?)
837 }
838 }
839
840 public final class MastershipBasedTimestamp implements Timestamp {
841
842 private final long termNumber;
843 private final long sequenceNumber;
844
845 /**
846 * Default constructor for serialization.
847 */
848 protected MastershipBasedTimestamp() {
849 this.termNumber = -1;
850 this.sequenceNumber = -1;
851 }
852
853 /**
854 * Default version tuple.
855 *
856 * @param termNumber the mastership termNumber
857 * @param sequenceNumber the sequenceNumber number within the termNumber
858 */
859 public MastershipBasedTimestamp(long termNumber, long sequenceNumber) {
860 this.termNumber = termNumber;
861 this.sequenceNumber = sequenceNumber;
862 }
863
864 @Override
865 public int compareTo(Timestamp o) {
866 checkArgument(o instanceof MastershipBasedTimestamp,
867 "Must be MastershipBasedTimestamp", o);
868 MastershipBasedTimestamp that = (MastershipBasedTimestamp) o;
869
870 return ComparisonChain.start()
871 .compare(this.termNumber, that.termNumber)
872 .compare(this.sequenceNumber, that.sequenceNumber)
873 .result();
874 }
875
876 @Override
877 public int hashCode() {
878 return Objects.hash(termNumber, sequenceNumber);
879 }
880
881 @Override
882 public boolean equals(Object obj) {
883 if (this == obj) {
884 return true;
885 }
886 if (!(obj instanceof MastershipBasedTimestamp)) {
887 return false;
888 }
889 MastershipBasedTimestamp that = (MastershipBasedTimestamp) obj;
890 return Objects.equals(this.termNumber, that.termNumber) &&
891 Objects.equals(this.sequenceNumber, that.sequenceNumber);
892 }
893
894 @Override
895 public String toString() {
896 return MoreObjects.toStringHelper(getClass())
897 .add("termNumber", termNumber)
898 .add("sequenceNumber", sequenceNumber)
899 .toString();
900 }
901
902 /**
903 * Returns the termNumber.
904 *
905 * @return termNumber
906 */
907 public long termNumber() {
908 return termNumber;
909 }
910
911 /**
912 * Returns the sequenceNumber number.
913 *
914 * @return sequenceNumber
915 */
916 public long sequenceNumber() {
917 return sequenceNumber;
918 }
919 }
920}