blob: 61ea50619c711b7d70c78f2925456e1472640c62 [file] [log] [blame]
Yoonseon Hane026a7f2017-08-23 06:05:25 +09001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.incubator.store.virtual.impl;
18
19import com.google.common.base.MoreObjects;
20import com.google.common.collect.ComparisonChain;
21import com.google.common.collect.ImmutableList;
22import com.google.common.collect.Iterables;
23import com.google.common.collect.Maps;
24import com.google.common.util.concurrent.Futures;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090025import org.onlab.util.KryoNamespace;
26import org.onlab.util.Tools;
27import org.onosproject.cfg.ComponentConfigService;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.CoreService;
31import org.onosproject.core.IdGenerator;
32import org.onosproject.incubator.net.virtual.NetworkId;
33import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
34import org.onosproject.incubator.net.virtual.VirtualNetworkService;
35import org.onosproject.incubator.store.virtual.impl.primitives.VirtualDeviceId;
36import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowEntry;
37import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRule;
38import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRuleBatchEvent;
39import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRuleBatchOperation;
40import org.onosproject.mastership.MastershipService;
41import org.onosproject.net.DeviceId;
42import org.onosproject.net.device.DeviceService;
43import org.onosproject.net.flow.BatchOperationEntry;
44import org.onosproject.net.flow.CompletedBatchOperation;
45import org.onosproject.net.flow.DefaultFlowEntry;
46import org.onosproject.net.flow.FlowEntry;
47import org.onosproject.net.flow.FlowId;
48import org.onosproject.net.flow.FlowRule;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090049import org.onosproject.net.flow.FlowRuleEvent;
50import org.onosproject.net.flow.FlowRuleService;
51import org.onosproject.net.flow.FlowRuleStoreDelegate;
52import org.onosproject.net.flow.StoredFlowEntry;
53import org.onosproject.net.flow.TableStatisticsEntry;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070054import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
55import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
56import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
57import org.onosproject.net.flow.oldbatch.FlowRuleBatchRequest;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090058import org.onosproject.store.Timestamp;
59import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60import org.onosproject.store.cluster.messaging.ClusterMessage;
61import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62import org.onosproject.store.cluster.messaging.MessageSubject;
63import org.onosproject.store.serializers.KryoNamespaces;
64import org.onosproject.store.service.EventuallyConsistentMap;
65import org.onosproject.store.service.EventuallyConsistentMapEvent;
66import org.onosproject.store.service.EventuallyConsistentMapListener;
67import org.onosproject.store.service.Serializer;
68import org.onosproject.store.service.StorageService;
69import org.onosproject.store.service.WallClockTimestamp;
70import org.osgi.service.component.ComponentContext;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071import org.osgi.service.component.annotations.Activate;
72import org.osgi.service.component.annotations.Component;
73import org.osgi.service.component.annotations.Deactivate;
74import org.osgi.service.component.annotations.Modified;
75import org.osgi.service.component.annotations.Reference;
76import org.osgi.service.component.annotations.ReferenceCardinality;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090077import org.slf4j.Logger;
78
79import java.util.Collections;
80import java.util.Dictionary;
81import java.util.HashSet;
82import java.util.List;
83import java.util.Map;
84import java.util.Objects;
85import java.util.Set;
86import java.util.concurrent.ExecutorService;
87import java.util.concurrent.Executors;
88import java.util.concurrent.TimeUnit;
89import java.util.concurrent.atomic.AtomicInteger;
90import java.util.concurrent.atomic.AtomicReference;
91import java.util.stream.Collectors;
92
93import static com.google.common.base.Preconditions.checkArgument;
94import static com.google.common.base.Strings.isNullOrEmpty;
95import static org.onlab.util.Tools.get;
96import static org.onlab.util.Tools.groupedThreads;
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -070097import static org.onosproject.incubator.store.virtual.impl.OsgiPropertyConstants.*;
Yoonseon Hane026a7f2017-08-23 06:05:25 +090098import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
99import static org.slf4j.LoggerFactory.getLogger;
100
101/**
102 * Manages inventory of flow rules using a distributed state management protocol
103 * for virtual networks.
104 */
105//TODO: support backup and persistent mechanism
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700106@Component(immediate = true, enabled = false, service = VirtualNetworkFlowRuleStore.class,
107 property = {
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -0700108 MESSAGE_HANDLER_THREAD_POOL_SIZE + ":Integer=" + MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT,
109 BACKUP_PERIOD_MILLIS + ":Integer=" + BACKUP_PERIOD_MILLIS_DEFAULT,
110 PERSISTENCE_ENABLED + ":Boolean=" + PERSISTENCE_ENABLED_DEFAULT,
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700111 })
112
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900113public class DistributedVirtualFlowRuleStore
114 extends AbstractVirtualStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
115 implements VirtualNetworkFlowRuleStore {
116
117 private final Logger log = getLogger(getClass());
118
119 //TODO: confirm this working fine with multiple thread more than 1
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900120 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
121
122 private static final String FLOW_OP_TOPIC = "virtual-flow-ops-ids";
123
124 // MessageSubjects used by DistributedVirtualFlowRuleStore peer-peer communication.
125 private static final MessageSubject APPLY_BATCH_FLOWS
126 = new MessageSubject("virtual-peer-forward-apply-batch");
127 private static final MessageSubject GET_FLOW_ENTRY
128 = new MessageSubject("virtual-peer-forward-get-flow-entry");
129 private static final MessageSubject GET_DEVICE_FLOW_ENTRIES
130 = new MessageSubject("virtual-peer-forward-get-device-flow-entries");
131 private static final MessageSubject REMOVE_FLOW_ENTRY
132 = new MessageSubject("virtual-peer-forward-remove-flow-entry");
133 private static final MessageSubject REMOTE_APPLY_COMPLETED
134 = new MessageSubject("virtual-peer-apply-completed");
135
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -0700136 /** Number of threads in the message handler pool. */
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700137 private int msgHandlerThreadPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900138
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -0700139 /** Delay in ms between successive backup runs. */
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700140 private int backupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -0700141
142 /** Indicates whether or not changes in the flow table should be persisted to disk.. */
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700143 private boolean persistenceEnabled = PERSISTENCE_ENABLED_DEFAULT;
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900144
145 private InternalFlowTable flowTable = new InternalFlowTable();
146
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700147 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900148 protected CoreService coreService;
149
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700150 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900151 protected ClusterService clusterService;
152
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700153 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900154 protected ClusterCommunicationService clusterCommunicator;
155
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700156 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900157 protected ComponentConfigService configService;
158
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900160 protected StorageService storageService;
161
Ray Milkeyd84f89b2018-08-17 14:54:17 -0700162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900163 protected VirtualNetworkService vnaService;
164
165 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
166 private ExecutorService messageHandlingExecutor;
167 private ExecutorService eventHandler;
168
169 private EventuallyConsistentMap<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>> deviceTableStats;
170 private final EventuallyConsistentMapListener<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>>
171 tableStatsListener = new InternalTableStatsListener();
172
173
174 protected final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
175 .register(KryoNamespaces.API)
176 .register(NetworkId.class)
177 .register(VirtualFlowRuleBatchOperation.class)
178 .register(VirtualFlowRuleBatchEvent.class)
179 .build());
180
181 protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
182 .register(KryoNamespaces.API)
183 .register(MastershipBasedTimestamp.class);
184
185 private IdGenerator idGenerator;
186 private NodeId local;
187
188
189 @Activate
190 public void activate(ComponentContext context) {
191 configService.registerProperties(getClass());
192
193 idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
194
195 local = clusterService.getLocalNode().id();
196
197 eventHandler = Executors.newSingleThreadExecutor(
198 groupedThreads("onos/virtual-flow", "event-handler", log));
199 messageHandlingExecutor = Executors.newFixedThreadPool(
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700200 msgHandlerThreadPoolSize, groupedThreads("onos/store/virtual-flow", "message-handlers", log));
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900201
202 registerMessageHandlers(messageHandlingExecutor);
203
204 deviceTableStats = storageService
205 .<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>>eventuallyConsistentMapBuilder()
206 .withName("onos-virtual-flow-table-stats")
207 .withSerializer(serializerBuilder)
208 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
209 .withTimestampProvider((k, v) -> new WallClockTimestamp())
210 .withTombstonesDisabled()
211 .build();
212 deviceTableStats.addListener(tableStatsListener);
213
214 logConfig("Started");
215 }
216
217 @Deactivate
218 public void deactivate(ComponentContext context) {
219 configService.unregisterProperties(getClass(), false);
220 unregisterMessageHandlers();
221 deviceTableStats.removeListener(tableStatsListener);
222 deviceTableStats.destroy();
223 eventHandler.shutdownNow();
224 messageHandlingExecutor.shutdownNow();
225 log.info("Stopped");
226 }
227
228 @SuppressWarnings("rawtypes")
229 @Modified
230 public void modified(ComponentContext context) {
231 if (context == null) {
232 logConfig("Default config");
233 return;
234 }
235
236 Dictionary properties = context.getProperties();
237 int newPoolSize;
238 int newBackupPeriod;
239 try {
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -0700240 String s = get(properties, MESSAGE_HANDLER_THREAD_POOL_SIZE);
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700241 newPoolSize = isNullOrEmpty(s) ? msgHandlerThreadPoolSize : Integer.parseInt(s.trim());
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900242
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -0700243 s = get(properties, BACKUP_PERIOD_MILLIS);
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900244 newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
245 } catch (NumberFormatException | ClassCastException e) {
Thomas Vachuska00b5d4f2018-10-30 15:13:20 -0700246 newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE_DEFAULT;
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700247 newBackupPeriod = BACKUP_PERIOD_MILLIS_DEFAULT;
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900248 }
249
250 boolean restartBackupTask = false;
251
252 if (newBackupPeriod != backupPeriod) {
253 backupPeriod = newBackupPeriod;
254 restartBackupTask = true;
255 }
256 if (restartBackupTask) {
257 log.warn("Currently, backup tasks are not supported.");
258 }
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700259 if (newPoolSize != msgHandlerThreadPoolSize) {
260 msgHandlerThreadPoolSize = newPoolSize;
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900261 ExecutorService oldMsgHandler = messageHandlingExecutor;
262 messageHandlingExecutor = Executors.newFixedThreadPool(
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700263 msgHandlerThreadPoolSize, groupedThreads("onos/store/virtual-flow", "message-handlers", log));
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900264
265 // replace previously registered handlers.
266 registerMessageHandlers(messageHandlingExecutor);
267 oldMsgHandler.shutdown();
268 }
269
270 logConfig("Reconfigured");
271 }
272
273 @Override
274 public int getFlowRuleCount(NetworkId networkId) {
275 AtomicInteger sum = new AtomicInteger(0);
276 DeviceService deviceService = vnaService.get(networkId, DeviceService.class);
277 deviceService.getDevices()
278 .forEach(device -> sum.addAndGet(
279 Iterables.size(getFlowEntries(networkId, device.id()))));
280 return sum.get();
281 }
282
283 @Override
284 public FlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
285 MastershipService mastershipService =
286 vnaService.get(networkId, MastershipService.class);
287 NodeId master = mastershipService.getMasterFor(rule.deviceId());
288
289 if (master == null) {
290 log.debug("Failed to getFlowEntry: No master for {}, vnet {}",
291 rule.deviceId(), networkId);
292 return null;
293 }
294
295 if (Objects.equals(local, master)) {
296 return flowTable.getFlowEntry(networkId, rule);
297 }
298
299 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) " +
300 "for device {}, vnet {}",
301 master, rule.deviceId(), networkId);
302
303 VirtualFlowRule vRule = new VirtualFlowRule(networkId, rule);
304
305 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(vRule,
306 GET_FLOW_ENTRY,
307 serializer::encode,
308 serializer::decode,
309 master),
310 FLOW_RULE_STORE_TIMEOUT_MILLIS,
311 TimeUnit.MILLISECONDS,
312 null);
313 }
314
315 @Override
316 public Iterable<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
317 MastershipService mastershipService =
318 vnaService.get(networkId, MastershipService.class);
319 NodeId master = mastershipService.getMasterFor(deviceId);
320
321 if (master == null) {
322 log.debug("Failed to getFlowEntries: No master for {}, vnet {}", deviceId, networkId);
323 return Collections.emptyList();
324 }
325
326 if (Objects.equals(local, master)) {
327 return flowTable.getFlowEntries(networkId, deviceId);
328 }
329
330 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
331 master, deviceId);
332
333 return Tools.futureGetOrElse(
334 clusterCommunicator.sendAndReceive(deviceId,
335 GET_DEVICE_FLOW_ENTRIES,
336 serializer::encode,
337 serializer::decode,
338 master),
339 FLOW_RULE_STORE_TIMEOUT_MILLIS,
340 TimeUnit.MILLISECONDS,
341 Collections.emptyList());
342 }
343
344 @Override
345 public void storeBatch(NetworkId networkId, FlowRuleBatchOperation operation) {
346 if (operation.getOperations().isEmpty()) {
347 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
348 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
349 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
350 return;
351 }
352
353 DeviceId deviceId = operation.deviceId();
354 MastershipService mastershipService =
355 vnaService.get(networkId, MastershipService.class);
356 NodeId master = mastershipService.getMasterFor(deviceId);
357
358 if (master == null) {
359 log.warn("No master for {}, vnet {} : flows will be marked for removal", deviceId, networkId);
360
361 updateStoreInternal(networkId, operation);
362
363 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
364 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
365 new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
366 return;
367 }
368
369 if (Objects.equals(local, master)) {
370 storeBatchInternal(networkId, operation);
371 return;
372 }
373
374 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}, vent {}",
375 master, deviceId, networkId);
376
377 clusterCommunicator.unicast(new VirtualFlowRuleBatchOperation(networkId, operation),
378 APPLY_BATCH_FLOWS,
379 serializer::encode,
380 master)
381 .whenComplete((result, error) -> {
382 if (error != null) {
383 log.warn("Failed to storeBatch: {} to {}", operation, master, error);
384
385 Set<FlowRule> allFailures = operation.getOperations()
386 .stream()
387 .map(BatchOperationEntry::target)
388 .collect(Collectors.toSet());
389
390 notifyDelegate(networkId, FlowRuleBatchEvent.completed(
391 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
392 new CompletedBatchOperation(false, allFailures, deviceId)));
393 }
394 });
395 }
396
397 @Override
398 public void batchOperationComplete(NetworkId networkId, FlowRuleBatchEvent event) {
399 //FIXME: need a per device pending response
400 NodeId nodeId = pendingResponses.remove(event.subject().batchId());
401 if (nodeId == null) {
402 notifyDelegate(networkId, event);
403 } else {
404 // TODO check unicast return value
405 clusterCommunicator.unicast(new VirtualFlowRuleBatchEvent(networkId, event),
406 REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
407 //error log: log.warn("Failed to respond to peer for batch operation result");
408 }
409 }
410
411 @Override
412 public void deleteFlowRule(NetworkId networkId, FlowRule rule) {
413 storeBatch(networkId,
414 new FlowRuleBatchOperation(
415 Collections.singletonList(
416 new FlowRuleBatchEntry(
417 FlowRuleBatchEntry.FlowRuleOperation.REMOVE,
418 rule)), rule.deviceId(), idGenerator.getNewId()));
419 }
420
421 @Override
422 public FlowRuleEvent addOrUpdateFlowRule(NetworkId networkId, FlowEntry rule) {
423 MastershipService mastershipService =
424 vnaService.get(networkId, MastershipService.class);
425 NodeId master = mastershipService.getMasterFor(rule.deviceId());
426 if (Objects.equals(local, master)) {
427 return addOrUpdateFlowRuleInternal(networkId, rule);
428 }
429
430 log.warn("Tried to update FlowRule {} state,"
431 + " while the Node was not the master.", rule);
432 return null;
433 }
434
435 private FlowRuleEvent addOrUpdateFlowRuleInternal(NetworkId networkId, FlowEntry rule) {
436 // check if this new rule is an update to an existing entry
437 StoredFlowEntry stored = flowTable.getFlowEntry(networkId, rule);
438 if (stored != null) {
439 //FIXME modification of "stored" flow entry outside of flow table
440 stored.setBytes(rule.bytes());
441 stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
442 stored.setLiveType(rule.liveType());
443 stored.setPackets(rule.packets());
444 stored.setLastSeen();
445 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
446 stored.setState(FlowEntry.FlowEntryState.ADDED);
447 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, rule);
448 }
449 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
450 }
451
452 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
453 // TODO: also update backup if the behavior is correct.
454 flowTable.add(networkId, rule);
455 return null;
456 }
457
458 @Override
459 public FlowRuleEvent removeFlowRule(NetworkId networkId, FlowEntry rule) {
460 final DeviceId deviceId = rule.deviceId();
461
462 MastershipService mastershipService =
463 vnaService.get(networkId, MastershipService.class);
464 NodeId master = mastershipService.getMasterFor(deviceId);
465
466 if (Objects.equals(local, master)) {
467 // bypass and handle it locally
468 return removeFlowRuleInternal(new VirtualFlowEntry(networkId, rule));
469 }
470
471 if (master == null) {
472 log.warn("Failed to removeFlowRule: No master for {}", deviceId);
473 // TODO: revisit if this should be null (="no-op") or Exception
474 return null;
475 }
476
477 log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
478 master, deviceId);
479
480 return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
481 new VirtualFlowEntry(networkId, rule),
482 REMOVE_FLOW_ENTRY,
483 serializer::encode,
484 serializer::decode,
485 master));
486 }
487
488 @Override
489 public FlowRuleEvent pendingFlowRule(NetworkId networkId, FlowEntry rule) {
490 MastershipService mastershipService =
491 vnaService.get(networkId, MastershipService.class);
492 if (mastershipService.isLocalMaster(rule.deviceId())) {
493 StoredFlowEntry stored = flowTable.getFlowEntry(networkId, rule);
494 if (stored != null &&
495 stored.state() != FlowEntry.FlowEntryState.PENDING_ADD) {
496 stored.setState(FlowEntry.FlowEntryState.PENDING_ADD);
497 return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
498 }
499 }
500 return null;
501 }
502
503 @Override
504 public void purgeFlowRules(NetworkId networkId) {
505 flowTable.purgeFlowRules(networkId);
506 }
507
508 @Override
509 public FlowRuleEvent updateTableStatistics(NetworkId networkId,
510 DeviceId deviceId,
511 List<TableStatisticsEntry> tableStats) {
512 if (deviceTableStats.get(networkId) == null) {
513 deviceTableStats.put(networkId, Maps.newConcurrentMap());
514 }
515 deviceTableStats.get(networkId).put(deviceId, tableStats);
516 return null;
517 }
518
519 @Override
520 public Iterable<TableStatisticsEntry> getTableStatistics(NetworkId networkId, DeviceId deviceId) {
521 MastershipService mastershipService =
522 vnaService.get(networkId, MastershipService.class);
523 NodeId master = mastershipService.getMasterFor(deviceId);
524
525 if (master == null) {
526 log.debug("Failed to getTableStats: No master for {}", deviceId);
527 return Collections.emptyList();
528 }
529
530 if (deviceTableStats.get(networkId) == null) {
531 deviceTableStats.put(networkId, Maps.newConcurrentMap());
532 }
533
534 List<TableStatisticsEntry> tableStats = deviceTableStats.get(networkId).get(deviceId);
535 if (tableStats == null) {
536 return Collections.emptyList();
537 }
538 return ImmutableList.copyOf(tableStats);
539 }
540
541 private void registerMessageHandlers(ExecutorService executor) {
542 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
543 clusterCommunicator.<VirtualFlowRuleBatchEvent>addSubscriber(
544 REMOTE_APPLY_COMPLETED, serializer::decode,
545 this::notifyDelicateByNetwork, executor);
546 clusterCommunicator.addSubscriber(
547 GET_FLOW_ENTRY, serializer::decode, this::getFlowEntryByNetwork,
548 serializer::encode, executor);
549 clusterCommunicator.addSubscriber(
550 GET_DEVICE_FLOW_ENTRIES, serializer::decode,
551 this::getFlowEntriesByNetwork,
552 serializer::encode, executor);
553 clusterCommunicator.addSubscriber(
554 REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal,
555 serializer::encode, executor);
556 }
557
558 private void unregisterMessageHandlers() {
559 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
560 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
561 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
562 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
563 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
564 }
565
566
567 private void logConfig(String prefix) {
568 log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}",
Thomas Vachuskaedf9c4d2018-10-15 19:14:19 -0700569 prefix, msgHandlerThreadPoolSize, backupPeriod);
Yoonseon Hane026a7f2017-08-23 06:05:25 +0900570 }
571
572 private void storeBatchInternal(NetworkId networkId, FlowRuleBatchOperation operation) {
573
574 final DeviceId did = operation.deviceId();
575 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
576 Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(networkId, operation);
577 if (currentOps.isEmpty()) {
578 batchOperationComplete(networkId, FlowRuleBatchEvent.completed(
579 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
580 new CompletedBatchOperation(true, Collections.emptySet(), did)));
581 return;
582 }
583
584 //Confirm that flowrule service is created
585 vnaService.get(networkId, FlowRuleService.class);
586
587 notifyDelegate(networkId, FlowRuleBatchEvent.requested(new
588 FlowRuleBatchRequest(operation.id(),
589 currentOps), operation.deviceId()));
590 }
591
592 private Set<FlowRuleBatchEntry> updateStoreInternal(NetworkId networkId,
593 FlowRuleBatchOperation operation) {
594 return operation.getOperations().stream().map(
595 op -> {
596 StoredFlowEntry entry;
597 switch (op.operator()) {
598 case ADD:
599 entry = new DefaultFlowEntry(op.target());
600 // always add requested FlowRule
601 // Note: 2 equal FlowEntry may have different treatment
602 flowTable.remove(networkId, entry.deviceId(), entry);
603 flowTable.add(networkId, entry);
604
605 return op;
606 case REMOVE:
607 entry = flowTable.getFlowEntry(networkId, op.target());
608 if (entry != null) {
609 //FIXME modification of "stored" flow entry outside of flow table
610 entry.setState(FlowEntry.FlowEntryState.PENDING_REMOVE);
611 log.debug("Setting state of rule to pending remove: {}", entry);
612 return op;
613 }
614 break;
615 case MODIFY:
616 //TODO: figure this out at some point
617 break;
618 default:
619 log.warn("Unknown flow operation operator: {}", op.operator());
620 }
621 return null;
622 }
623 ).filter(Objects::nonNull).collect(Collectors.toSet());
624 }
625
626 private FlowRuleEvent removeFlowRuleInternal(VirtualFlowEntry rule) {
627 final DeviceId deviceId = rule.flowEntry().deviceId();
628 // This is where one could mark a rule as removed and still keep it in the store.
629 final FlowEntry removed = flowTable.remove(rule.networkId(), deviceId, rule.flowEntry());
630 // rule may be partial rule that is missing treatment, we should use rule from store instead
631 return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
632 }
633
634 private final class OnStoreBatch implements ClusterMessageHandler {
635
636 @Override
637 public void handle(final ClusterMessage message) {
638 VirtualFlowRuleBatchOperation vOperation = serializer.decode(message.payload());
639 log.debug("received batch request {}", vOperation);
640
641 FlowRuleBatchOperation operation = vOperation.operation();
642
643 final DeviceId deviceId = operation.deviceId();
644 MastershipService mastershipService =
645 vnaService.get(vOperation.networkId(), MastershipService.class);
646 NodeId master = mastershipService.getMasterFor(deviceId);
647 if (!Objects.equals(local, master)) {
648 Set<FlowRule> failures = new HashSet<>(operation.size());
649 for (FlowRuleBatchEntry op : operation.getOperations()) {
650 failures.add(op.target());
651 }
652 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
653 // This node is no longer the master, respond as all failed.
654 // TODO: we might want to wrap response in envelope
655 // to distinguish sw programming failure and hand over
656 // it make sense in the latter case to retry immediately.
657 message.respond(serializer.encode(allFailed));
658 return;
659 }
660
661 pendingResponses.put(operation.id(), message.sender());
662 storeBatchInternal(vOperation.networkId(), operation);
663 }
664 }
665
666 /**
667 * Returns flow rule entry using virtual flow rule.
668 *
669 * @param rule an encapsulated flow rule to be queried
670 */
671 private FlowEntry getFlowEntryByNetwork(VirtualFlowRule rule) {
672 return flowTable.getFlowEntry(rule.networkId(), rule.rule());
673 }
674
675 /**
676 * returns flow entries using virtual device id.
677 *
678 * @param deviceId an encapsulated virtual device id
679 * @return a set of flow entries
680 */
681 private Set<FlowEntry> getFlowEntriesByNetwork(VirtualDeviceId deviceId) {
682 return flowTable.getFlowEntries(deviceId.networkId(), deviceId.deviceId());
683 }
684
685 /**
686 * span out Flow Rule Batch event according to virtual network id.
687 *
688 * @param event a event to be span out
689 */
690 private void notifyDelicateByNetwork(VirtualFlowRuleBatchEvent event) {
691 batchOperationComplete(event.networkId(), event.event());
692 }
693
694 private class InternalFlowTable {
695 //TODO replace the Map<V,V> with ExtendedSet
696 //TODO: support backup mechanism
697 private final Map<NetworkId, Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>>
698 flowEntriesMap = Maps.newConcurrentMap();
699 private final Map<NetworkId, Map<DeviceId, Long>> lastUpdateTimesMap = Maps.newConcurrentMap();
700
701 private Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
702 getFlowEntriesByNetwork(NetworkId networkId) {
703 return flowEntriesMap.computeIfAbsent(networkId, k -> Maps.newConcurrentMap());
704 }
705
706 private Map<DeviceId, Long> getLastUpdateTimesByNetwork(NetworkId networkId) {
707 return lastUpdateTimesMap.computeIfAbsent(networkId, k -> Maps.newConcurrentMap());
708 }
709
710 /**
711 * Returns the flow table for specified device.
712 *
713 * @param networkId virtual network identifier
714 * @param deviceId identifier of the device
715 * @return Map representing Flow Table of given device.
716 */
717 private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>
718 getFlowTable(NetworkId networkId, DeviceId deviceId) {
719 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
720 flowEntries = getFlowEntriesByNetwork(networkId);
721 if (persistenceEnabled) {
722 //TODO: support persistent
723 log.warn("Persistent is not supported");
724 return null;
725 } else {
726 return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
727 }
728 }
729
730 private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>
731 getFlowTableCopy(NetworkId networkId, DeviceId deviceId) {
732
733 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
734 flowEntries = getFlowEntriesByNetwork(networkId);
735 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
736
737 if (persistenceEnabled) {
738 //TODO: support persistent
739 log.warn("Persistent is not supported");
740 return null;
741 } else {
742 flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
743 copy.put(k, Maps.newHashMap(v));
744 });
745 return copy;
746 }
747 }
748
749 private Map<StoredFlowEntry, StoredFlowEntry>
750 getFlowEntriesInternal(NetworkId networkId, DeviceId deviceId, FlowId flowId) {
751
752 return getFlowTable(networkId, deviceId)
753 .computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
754 }
755
756 private StoredFlowEntry getFlowEntryInternal(NetworkId networkId, FlowRule rule) {
757
758 return getFlowEntriesInternal(networkId, rule.deviceId(), rule.id()).get(rule);
759 }
760
761 private Set<FlowEntry> getFlowEntriesInternal(NetworkId networkId, DeviceId deviceId) {
762
763 return getFlowTable(networkId, deviceId).values().stream()
764 .flatMap(m -> m.values().stream())
765 .collect(Collectors.toSet());
766 }
767
768 public StoredFlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
769 return getFlowEntryInternal(networkId, rule);
770 }
771
772 public Set<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
773
774 return getFlowEntriesInternal(networkId, deviceId);
775 }
776
777 public void add(NetworkId networkId, FlowEntry rule) {
778 Map<DeviceId, Long> lastUpdateTimes = getLastUpdateTimesByNetwork(networkId);
779
780 getFlowEntriesInternal(networkId, rule.deviceId(), rule.id())
781 .compute((StoredFlowEntry) rule, (k, stored) -> {
782 //TODO compare stored and rule timestamps
783 //TODO the key is not updated
784 return (StoredFlowEntry) rule;
785 });
786 lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
787 }
788
789 public FlowEntry remove(NetworkId networkId, DeviceId deviceId, FlowEntry rule) {
790 final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
791 Map<DeviceId, Long> lastUpdateTimes = getLastUpdateTimesByNetwork(networkId);
792
793 getFlowEntriesInternal(networkId, rule.deviceId(), rule.id())
794 .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
795 if (rule instanceof DefaultFlowEntry) {
796 DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
797 if (stored instanceof DefaultFlowEntry) {
798 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
799 if (toRemove.created() < storedEntry.created()) {
800 log.debug("Trying to remove more recent flow entry {} (stored: {})",
801 toRemove, stored);
802 // the key is not updated, removedRule remains null
803 return stored;
804 }
805 }
806 }
807 removedRule.set(stored);
808 return null;
809 });
810
811 if (removedRule.get() != null) {
812 lastUpdateTimes.put(deviceId, System.currentTimeMillis());
813 return removedRule.get();
814 } else {
815 return null;
816 }
817 }
818
819 public void purgeFlowRule(NetworkId networkId, DeviceId deviceId) {
820 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
821 flowEntries = getFlowEntriesByNetwork(networkId);
822 flowEntries.remove(deviceId);
823 }
824
825 public void purgeFlowRules(NetworkId networkId) {
826 Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
827 flowEntries = getFlowEntriesByNetwork(networkId);
828 flowEntries.clear();
829 }
830 }
831
832 private class InternalTableStatsListener
833 implements EventuallyConsistentMapListener<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>> {
834
835 @Override
836 public void event(EventuallyConsistentMapEvent<NetworkId, Map<DeviceId,
837 List<TableStatisticsEntry>>> event) {
838 //TODO: Generate an event to listeners (do we need?)
839 }
840 }
841
842 public final class MastershipBasedTimestamp implements Timestamp {
843
844 private final long termNumber;
845 private final long sequenceNumber;
846
847 /**
848 * Default constructor for serialization.
849 */
850 protected MastershipBasedTimestamp() {
851 this.termNumber = -1;
852 this.sequenceNumber = -1;
853 }
854
855 /**
856 * Default version tuple.
857 *
858 * @param termNumber the mastership termNumber
859 * @param sequenceNumber the sequenceNumber number within the termNumber
860 */
861 public MastershipBasedTimestamp(long termNumber, long sequenceNumber) {
862 this.termNumber = termNumber;
863 this.sequenceNumber = sequenceNumber;
864 }
865
866 @Override
867 public int compareTo(Timestamp o) {
868 checkArgument(o instanceof MastershipBasedTimestamp,
869 "Must be MastershipBasedTimestamp", o);
870 MastershipBasedTimestamp that = (MastershipBasedTimestamp) o;
871
872 return ComparisonChain.start()
873 .compare(this.termNumber, that.termNumber)
874 .compare(this.sequenceNumber, that.sequenceNumber)
875 .result();
876 }
877
878 @Override
879 public int hashCode() {
880 return Objects.hash(termNumber, sequenceNumber);
881 }
882
883 @Override
884 public boolean equals(Object obj) {
885 if (this == obj) {
886 return true;
887 }
888 if (!(obj instanceof MastershipBasedTimestamp)) {
889 return false;
890 }
891 MastershipBasedTimestamp that = (MastershipBasedTimestamp) obj;
892 return Objects.equals(this.termNumber, that.termNumber) &&
893 Objects.equals(this.sequenceNumber, that.sequenceNumber);
894 }
895
896 @Override
897 public String toString() {
898 return MoreObjects.toStringHelper(getClass())
899 .add("termNumber", termNumber)
900 .add("sequenceNumber", sequenceNumber)
901 .toString();
902 }
903
904 /**
905 * Returns the termNumber.
906 *
907 * @return termNumber
908 */
909 public long termNumber() {
910 return termNumber;
911 }
912
913 /**
914 * Returns the sequenceNumber number.
915 *
916 * @return sequenceNumber
917 */
918 public long sequenceNumber() {
919 return sequenceNumber;
920 }
921 }
922}