Revert "[ONOS-6542] (vNet) Distributed Flow Rule store"
This reverts commit f1197235a2bccd20d8fa551f7d4b3370eb9be3dd.
Change-Id: Ic24a8b96eb2126176d8eb3de7fa4ebe57e381a98
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualFlowRuleStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualFlowRuleStore.java
deleted file mode 100644
index 6da0040..0000000
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/DistributedVirtualFlowRuleStore.java
+++ /dev/null
@@ -1,828 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.incubator.store.virtual.impl;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Futures;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.CoreService;
-import org.onosproject.core.IdGenerator;
-import org.onosproject.incubator.net.virtual.NetworkId;
-import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
-import org.onosproject.incubator.net.virtual.VirtualNetworkService;
-import org.onosproject.incubator.store.virtual.impl.primitives.VirtualDeviceId;
-import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowEntry;
-import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRule;
-import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRuleBatchEvent;
-import org.onosproject.incubator.store.virtual.impl.primitives.VirtualFlowRuleBatchOperation;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.FlowId;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEvent;
-import org.onosproject.net.flow.FlowRuleBatchOperation;
-import org.onosproject.net.flow.FlowRuleBatchRequest;
-import org.onosproject.net.flow.FlowRuleEvent;
-import org.onosproject.net.flow.FlowRuleStoreDelegate;
-import org.onosproject.net.flow.StoredFlowEntry;
-import org.onosproject.net.flow.TableStatisticsEntry;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.flow.impl.FlowStoreMessageSubjects;
-import org.onosproject.store.impl.MastershipBasedTimestamp;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.WallClockTimestamp;
-import org.osgi.service.component.ComponentContext;
-import org.slf4j.Logger;
-
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of flow rules using a distributed state management protocol
- * for virtual networks.
- */
-//TODO: support backup and persistent mechanism
-@Component(immediate = true, enabled = false)
-@Service
-public class DistributedVirtualFlowRuleStore
- extends AbstractVirtualStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
- implements VirtualNetworkFlowRuleStore {
-
- private final Logger log = getLogger(getClass());
-
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
- private static final boolean DEFAULT_PERSISTENCE_ENABLED = false;
- private static final int DEFAULT_BACKUP_PERIOD_MILLIS = 2000;
- private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
-
- private static final String FLOW_OP_TOPIC = "virtual-flow-ops-ids";
-
- // MessageSubjects used by DistributedVirtualFlowRuleStore peer-peer communication.
- private static final MessageSubject APPLY_BATCH_FLOWS
- = new MessageSubject("virtual-peer-forward-apply-batch");
- private static final MessageSubject GET_FLOW_ENTRY
- = new MessageSubject("virtual-peer-forward-get-flow-entry");
- private static final MessageSubject GET_DEVICE_FLOW_ENTRIES
- = new MessageSubject("virtual-peer-forward-get-device-flow-entries");
- private static final MessageSubject REMOVE_FLOW_ENTRY
- = new MessageSubject("virtual-peer-forward-remove-flow-entry");
- private static final MessageSubject REMOTE_APPLY_COMPLETED
- = new MessageSubject("virtual-peer-apply-completed");
-
- @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
- label = "Number of threads in the message handler pool")
- private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
-
- @Property(name = "backupPeriod", intValue = DEFAULT_BACKUP_PERIOD_MILLIS,
- label = "Delay in ms between successive backup runs")
- private int backupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
- @Property(name = "persistenceEnabled", boolValue = false,
- label = "Indicates whether or not changes in the flow table should be persisted to disk.")
- private boolean persistenceEnabled = DEFAULT_PERSISTENCE_ENABLED;
-
- private InternalFlowTable flowTable = new InternalFlowTable();
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoreService coreService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ComponentConfigService configService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected VirtualNetworkService vnaService;
-
- private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
- private ExecutorService messageHandlingExecutor;
- private ExecutorService eventHandler;
-
- private EventuallyConsistentMap<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>> deviceTableStats;
- private final EventuallyConsistentMapListener<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>>
- tableStatsListener = new InternalTableStatsListener();
-
- protected final Serializer serializer = Serializer.using(KryoNamespaces.API);
-
- protected final KryoNamespace.Builder serializerBuilder = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(MastershipBasedTimestamp.class);
-
- private IdGenerator idGenerator;
- private NodeId local;
-
-
- @Activate
- public void activate(ComponentContext context) {
- configService.registerProperties(getClass());
-
- idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
-
- local = clusterService.getLocalNode().id();
-
- eventHandler = Executors.newSingleThreadExecutor(
- groupedThreads("onos/virtual-flow", "event-handler", log));
- messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/virtual-flow", "message-handlers", log));
-
- registerMessageHandlers(messageHandlingExecutor);
-
- deviceTableStats = storageService
- .<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>>eventuallyConsistentMapBuilder()
- .withName("onos-virtual-flow-table-stats")
- .withSerializer(serializerBuilder)
- .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
- .withTimestampProvider((k, v) -> new WallClockTimestamp())
- .withTombstonesDisabled()
- .build();
- deviceTableStats.addListener(tableStatsListener);
-
- logConfig("Started");
- }
-
- @Deactivate
- public void deactivate(ComponentContext context) {
- configService.unregisterProperties(getClass(), false);
- unregisterMessageHandlers();
- deviceTableStats.removeListener(tableStatsListener);
- deviceTableStats.destroy();
- eventHandler.shutdownNow();
- messageHandlingExecutor.shutdownNow();
- log.info("Stopped");
- }
-
- @SuppressWarnings("rawtypes")
- @Modified
- public void modified(ComponentContext context) {
- if (context == null) {
- logConfig("Default config");
- return;
- }
-
- Dictionary properties = context.getProperties();
- int newPoolSize;
- int newBackupPeriod;
- try {
- String s = get(properties, "msgHandlerPoolSize");
- newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
-
- s = get(properties, "backupPeriod");
- newBackupPeriod = isNullOrEmpty(s) ? backupPeriod : Integer.parseInt(s.trim());
- } catch (NumberFormatException | ClassCastException e) {
- newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
- newBackupPeriod = DEFAULT_BACKUP_PERIOD_MILLIS;
- }
-
- boolean restartBackupTask = false;
-
- if (newBackupPeriod != backupPeriod) {
- backupPeriod = newBackupPeriod;
- restartBackupTask = true;
- }
- if (restartBackupTask) {
- log.warn("Currently, backup tasks are not supported.");
- }
- if (newPoolSize != msgHandlerPoolSize) {
- msgHandlerPoolSize = newPoolSize;
- ExecutorService oldMsgHandler = messageHandlingExecutor;
- messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/virtual-flow", "message-handlers", log));
-
- // replace previously registered handlers.
- registerMessageHandlers(messageHandlingExecutor);
- oldMsgHandler.shutdown();
- }
-
- logConfig("Reconfigured");
- }
-
- @Override
- public int getFlowRuleCount(NetworkId networkId) {
- AtomicInteger sum = new AtomicInteger(0);
- DeviceService deviceService = vnaService.get(networkId, DeviceService.class);
- deviceService.getDevices()
- .forEach(device -> sum.addAndGet(
- Iterables.size(getFlowEntries(networkId, device.id()))));
- return sum.get();
- }
-
- @Override
- public FlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
- MastershipService mastershipService =
- vnaService.get(networkId, MastershipService.class);
- NodeId master = mastershipService.getMasterFor(rule.deviceId());
-
- if (master == null) {
- log.debug("Failed to getFlowEntry: No master for {}, vnet {}",
- rule.deviceId(), networkId);
- return null;
- }
-
- if (Objects.equals(local, master)) {
- return flowTable.getFlowEntry(networkId, rule);
- }
-
- log.trace("Forwarding getFlowEntry to {}, which is the primary (master) " +
- "for device {}, vnet {}",
- master, rule.deviceId(), networkId);
-
- VirtualFlowRule vRule = new VirtualFlowRule(networkId, rule);
-
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(vRule,
- FlowStoreMessageSubjects.GET_FLOW_ENTRY,
- serializer::encode,
- serializer::decode,
- master),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- null);
- }
-
- @Override
- public Iterable<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
- MastershipService mastershipService =
- vnaService.get(networkId, MastershipService.class);
- NodeId master = mastershipService.getMasterFor(deviceId);
-
- if (master == null) {
- log.debug("Failed to getFlowEntries: No master for {}, vnet {}", deviceId, networkId);
- return Collections.emptyList();
- }
-
- if (Objects.equals(local, master)) {
- return flowTable.getFlowEntries(networkId, deviceId);
- }
-
- log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- master, deviceId);
-
- return Tools.futureGetOrElse(
- clusterCommunicator.sendAndReceive(deviceId,
- FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
- serializer::encode,
- serializer::decode,
- master),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptyList());
- }
-
- @Override
- public void storeBatch(NetworkId networkId, FlowRuleBatchOperation operation) {
- if (operation.getOperations().isEmpty()) {
- notifyDelegate(networkId, FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
- return;
- }
-
- DeviceId deviceId = operation.deviceId();
- MastershipService mastershipService =
- vnaService.get(networkId, MastershipService.class);
- NodeId master = mastershipService.getMasterFor(deviceId);
-
- if (master == null) {
- log.warn("No master for {}, vnet {} : flows will be marked for removal", deviceId, networkId);
-
- updateStoreInternal(networkId, operation);
-
- notifyDelegate(networkId, FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
- return;
- }
-
- if (Objects.equals(local, master)) {
- storeBatchInternal(networkId, operation);
- return;
- }
-
- log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}, vent {}",
- master, deviceId, networkId);
-
- clusterCommunicator.unicast(new VirtualFlowRuleBatchOperation(networkId, operation),
- APPLY_BATCH_FLOWS,
- serializer::encode,
- master)
- .whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to storeBatch: {} to {}", operation, master, error);
-
- Set<FlowRule> allFailures = operation.getOperations()
- .stream()
- .map(op -> op.target())
- .collect(Collectors.toSet());
-
- notifyDelegate(networkId, FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(false, allFailures, deviceId)));
- }
- });
- }
-
- @Override
- public void batchOperationComplete(NetworkId networkId, FlowRuleBatchEvent event) {
- //FIXME: need a per device pending response
- NodeId nodeId = pendingResponses.remove(event.subject().batchId());
- if (nodeId == null) {
- notifyDelegate(networkId, event);
- } else {
- // TODO check unicast return value
- clusterCommunicator.unicast(new VirtualFlowRuleBatchEvent(networkId, event),
- REMOTE_APPLY_COMPLETED, serializer::encode, nodeId);
- //error log: log.warn("Failed to respond to peer for batch operation result");
- }
- }
-
- @Override
- public void deleteFlowRule(NetworkId networkId, FlowRule rule) {
- storeBatch(networkId,
- new FlowRuleBatchOperation(
- Collections.singletonList(
- new FlowRuleBatchEntry(
- FlowRuleBatchEntry.FlowRuleOperation.REMOVE,
- rule)), rule.deviceId(), idGenerator.getNewId()));
- }
-
- @Override
- public FlowRuleEvent addOrUpdateFlowRule(NetworkId networkId, FlowEntry rule) {
- MastershipService mastershipService =
- vnaService.get(networkId, MastershipService.class);
- NodeId master = mastershipService.getMasterFor(rule.deviceId());
- if (Objects.equals(local, master)) {
- return addOrUpdateFlowRuleInternal(networkId, rule);
- }
-
- log.warn("Tried to update FlowRule {} state,"
- + " while the Node was not the master.", rule);
- return null;
- }
-
- private FlowRuleEvent addOrUpdateFlowRuleInternal(NetworkId networkId, FlowEntry rule) {
- // check if this new rule is an update to an existing entry
- StoredFlowEntry stored = flowTable.getFlowEntry(networkId, rule);
- if (stored != null) {
- //FIXME modification of "stored" flow entry outside of flow table
- stored.setBytes(rule.bytes());
- stored.setLife(rule.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
- stored.setLiveType(rule.liveType());
- stored.setPackets(rule.packets());
- stored.setLastSeen();
- if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
- stored.setState(FlowEntry.FlowEntryState.ADDED);
- return new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, rule);
- }
- return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
- }
-
- // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
- // TODO: also update backup if the behavior is correct.
- flowTable.add(networkId, rule);
- return null;
- }
-
- @Override
- public FlowRuleEvent removeFlowRule(NetworkId networkId, FlowEntry rule) {
- final DeviceId deviceId = rule.deviceId();
-
- MastershipService mastershipService =
- vnaService.get(networkId, MastershipService.class);
- NodeId master = mastershipService.getMasterFor(deviceId);
-
- if (Objects.equals(local, master)) {
- // bypass and handle it locally
- return removeFlowRuleInternal(new VirtualFlowEntry(networkId, rule));
- }
-
- if (master == null) {
- log.warn("Failed to removeFlowRule: No master for {}", deviceId);
- // TODO: revisit if this should be null (="no-op") or Exception
- return null;
- }
-
- log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
- master, deviceId);
-
- return Futures.getUnchecked(clusterCommunicator.sendAndReceive(
- new VirtualFlowEntry(networkId, rule),
- REMOVE_FLOW_ENTRY,
- serializer::encode,
- serializer::decode,
- master));
- }
-
- @Override
- public FlowRuleEvent pendingFlowRule(NetworkId networkId, FlowEntry rule) {
- MastershipService mastershipService =
- vnaService.get(networkId, MastershipService.class);
- if (mastershipService.isLocalMaster(rule.deviceId())) {
- StoredFlowEntry stored = flowTable.getFlowEntry(networkId, rule);
- if (stored != null &&
- stored.state() != FlowEntry.FlowEntryState.PENDING_ADD) {
- stored.setState(FlowEntry.FlowEntryState.PENDING_ADD);
- return new FlowRuleEvent(FlowRuleEvent.Type.RULE_UPDATED, rule);
- }
- }
- return null;
- }
-
- @Override
- public void purgeFlowRules(NetworkId networkId) {
- flowTable.purgeFlowRules(networkId);
- }
-
- @Override
- public FlowRuleEvent updateTableStatistics(NetworkId networkId,
- DeviceId deviceId,
- List<TableStatisticsEntry> tableStats) {
- if (deviceTableStats.get(networkId) == null) {
- deviceTableStats.put(networkId, Maps.newConcurrentMap());
- }
- deviceTableStats.get(networkId).put(deviceId, tableStats);
- return null;
- }
-
- @Override
- public Iterable<TableStatisticsEntry> getTableStatistics(NetworkId networkId, DeviceId deviceId) {
- MastershipService mastershipService =
- vnaService.get(networkId, MastershipService.class);
- NodeId master = mastershipService.getMasterFor(deviceId);
-
- if (master == null) {
- log.debug("Failed to getTableStats: No master for {}", deviceId);
- return Collections.emptyList();
- }
-
- if (deviceTableStats.get(networkId) == null) {
- deviceTableStats.put(networkId, Maps.newConcurrentMap());
- }
-
- List<TableStatisticsEntry> tableStats = deviceTableStats.get(networkId).get(deviceId);
- if (tableStats == null) {
- return Collections.emptyList();
- }
- return ImmutableList.copyOf(tableStats);
- }
-
- private void registerMessageHandlers(ExecutorService executor) {
- clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
- clusterCommunicator.<VirtualFlowRuleBatchEvent>addSubscriber(
- REMOTE_APPLY_COMPLETED, serializer::decode,
- this::notifyDelicateByNetwork, executor);
- clusterCommunicator.addSubscriber(
- GET_FLOW_ENTRY, serializer::decode, this::getFlowEntryByNetwork,
- serializer::encode, executor);
- clusterCommunicator.addSubscriber(
- GET_DEVICE_FLOW_ENTRIES, serializer::decode,
- this::getFlowEntriesByNetwork,
- serializer::encode, executor);
- clusterCommunicator.addSubscriber(
- REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal,
- serializer::encode, executor);
- }
-
- private void unregisterMessageHandlers() {
- clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
- clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
- clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
- clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
- clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
- }
-
-
- private void logConfig(String prefix) {
- log.info("{} with msgHandlerPoolSize = {}; backupPeriod = {}",
- prefix, msgHandlerPoolSize, backupPeriod);
- }
-
- private void storeBatchInternal(NetworkId networkId, FlowRuleBatchOperation operation) {
-
- final DeviceId did = operation.deviceId();
- //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
- Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(networkId, operation);
- if (currentOps.isEmpty()) {
- batchOperationComplete(networkId, FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), did)));
- return;
- }
-
- notifyDelegate(networkId, FlowRuleBatchEvent.requested(new
- FlowRuleBatchRequest(operation.id(),
- currentOps), operation.deviceId()));
- }
-
- private Set<FlowRuleBatchEntry> updateStoreInternal(NetworkId networkId,
- FlowRuleBatchOperation operation) {
- return operation.getOperations().stream().map(
- op -> {
- StoredFlowEntry entry;
- switch (op.operator()) {
- case ADD:
- entry = new DefaultFlowEntry(op.target());
- // always add requested FlowRule
- // Note: 2 equal FlowEntry may have different treatment
- flowTable.remove(networkId, entry.deviceId(), entry);
- flowTable.add(networkId, entry);
-
- return op;
- case REMOVE:
- entry = flowTable.getFlowEntry(networkId, op.target());
- if (entry != null) {
- //FIXME modification of "stored" flow entry outside of flow table
- entry.setState(FlowEntry.FlowEntryState.PENDING_REMOVE);
- log.debug("Setting state of rule to pending remove: {}", entry);
- return op;
- }
- break;
- case MODIFY:
- //TODO: figure this out at some point
- break;
- default:
- log.warn("Unknown flow operation operator: {}", op.operator());
- }
- return null;
- }
- ).filter(Objects::nonNull).collect(Collectors.toSet());
- }
-
- private FlowRuleEvent removeFlowRuleInternal(VirtualFlowEntry rule) {
- final DeviceId deviceId = rule.flowEntry().deviceId();
- // This is where one could mark a rule as removed and still keep it in the store.
- final FlowEntry removed = flowTable.remove(rule.networkId(), deviceId, rule.flowEntry());
- // rule may be partial rule that is missing treatment, we should use rule from store instead
- return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null;
- }
-
- private final class OnStoreBatch implements ClusterMessageHandler {
-
- @Override
- public void handle(final ClusterMessage message) {
- VirtualFlowRuleBatchOperation vOperation = serializer.decode(message.payload());
- log.debug("received batch request {}", vOperation);
-
- FlowRuleBatchOperation operation = vOperation.operation();
-
- final DeviceId deviceId = operation.deviceId();
- MastershipService mastershipService =
- vnaService.get(vOperation.networkId(), MastershipService.class);
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (!Objects.equals(local, master)) {
- Set<FlowRule> failures = new HashSet<>(operation.size());
- for (FlowRuleBatchEntry op : operation.getOperations()) {
- failures.add(op.target());
- }
- CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
- // This node is no longer the master, respond as all failed.
- // TODO: we might want to wrap response in envelope
- // to distinguish sw programming failure and hand over
- // it make sense in the latter case to retry immediately.
- message.respond(serializer.encode(allFailed));
- return;
- }
-
- pendingResponses.put(operation.id(), message.sender());
- storeBatchInternal(vOperation.networkId(), operation);
- }
- }
-
- /**
- * Returns flow rule entry using virtual flow rule.
- *
- * @param rule an encapsulated flow rule to be queried
- */
- private FlowEntry getFlowEntryByNetwork(VirtualFlowRule rule) {
- return flowTable.getFlowEntry(rule.networkId(), rule.rule());
- }
-
- /**
- * returns flow entries using virtual device id.
- *
- * @param deviceId an encapsulated virtual device id
- * @return a set of flow entries
- */
- private Set<FlowEntry> getFlowEntriesByNetwork(VirtualDeviceId deviceId) {
- return flowTable.getFlowEntries(deviceId.networkId(), deviceId.deviceId());
- }
-
- /**
- * span out Flow Rule Batch event according to virtual network id.
- *
- * @param event a event to be span out
- */
- private void notifyDelicateByNetwork(VirtualFlowRuleBatchEvent event) {
- notifyDelegate(event.networkId(), event.event());
- }
-
- private class InternalFlowTable {
- //TODO replace the Map<V,V> with ExtendedSet
- //TODO: support backup mechanism
- private final Map<NetworkId, Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>>
- flowEntriesMap = Maps.newConcurrentMap();
- private final Map<NetworkId, Map<DeviceId, Long>> lastUpdateTimesMap = Maps.newConcurrentMap();
-
- private Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
- getFlowEntriesByNetwork(NetworkId networkId) {
- return flowEntriesMap.computeIfAbsent(networkId, k -> Maps.newConcurrentMap());
- }
-
- private Map<DeviceId, Long> getLastUpdateTimesByNetwork(NetworkId networkId) {
- return lastUpdateTimesMap.computeIfAbsent(networkId, k -> Maps.newConcurrentMap());
- }
-
- /**
- * Returns the flow table for specified device.
- *
- * @param networkId virtual network identifier
- * @param deviceId identifier of the device
- * @return Map representing Flow Table of given device.
- */
- private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>
- getFlowTable(NetworkId networkId, DeviceId deviceId) {
- Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
- flowEntries = getFlowEntriesByNetwork(networkId);
- if (persistenceEnabled) {
- //TODO: support persistent
- log.warn("Persistent is not supported");
- return null;
- } else {
- return flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap());
- }
- }
-
- private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>
- getFlowTableCopy(NetworkId networkId, DeviceId deviceId) {
-
- Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
- flowEntries = getFlowEntriesByNetwork(networkId);
- Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> copy = Maps.newHashMap();
-
- if (persistenceEnabled) {
- //TODO: support persistent
- log.warn("Persistent is not supported");
- return null;
- } else {
- flowEntries.computeIfAbsent(deviceId, id -> Maps.newConcurrentMap()).forEach((k, v) -> {
- copy.put(k, Maps.newHashMap(v));
- });
- return copy;
- }
- }
-
- private Map<StoredFlowEntry, StoredFlowEntry>
- getFlowEntriesInternal(NetworkId networkId, DeviceId deviceId, FlowId flowId) {
-
- return getFlowTable(networkId, deviceId)
- .computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
- }
-
- private StoredFlowEntry getFlowEntryInternal(NetworkId networkId, FlowRule rule) {
-
- return getFlowEntriesInternal(networkId, rule.deviceId(), rule.id()).get(rule);
- }
-
- private Set<FlowEntry> getFlowEntriesInternal(NetworkId networkId, DeviceId deviceId) {
-
- return getFlowTable(networkId, deviceId).values().stream()
- .flatMap(m -> m.values().stream())
- .collect(Collectors.toSet());
- }
-
- public StoredFlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
- return getFlowEntryInternal(networkId, rule);
- }
-
- public Set<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
-
- return getFlowEntriesInternal(networkId, deviceId);
- }
-
- public void add(NetworkId networkId, FlowEntry rule) {
- Map<DeviceId, Long> lastUpdateTimes = getLastUpdateTimesByNetwork(networkId);
-
- getFlowEntriesInternal(networkId, rule.deviceId(), rule.id())
- .compute((StoredFlowEntry) rule, (k, stored) -> {
- //TODO compare stored and rule timestamps
- //TODO the key is not updated
- return (StoredFlowEntry) rule;
- });
- lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
- }
-
- public FlowEntry remove(NetworkId networkId, DeviceId deviceId, FlowEntry rule) {
- final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
- Map<DeviceId, Long> lastUpdateTimes = getLastUpdateTimesByNetwork(networkId);
-
- getFlowEntriesInternal(networkId, rule.deviceId(), rule.id())
- .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
- if (rule instanceof DefaultFlowEntry) {
- DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
- if (stored instanceof DefaultFlowEntry) {
- DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
- if (toRemove.created() < storedEntry.created()) {
- log.debug("Trying to remove more recent flow entry {} (stored: {})",
- toRemove, stored);
- // the key is not updated, removedRule remains null
- return stored;
- }
- }
- }
- removedRule.set(stored);
- return null;
- });
-
- if (removedRule.get() != null) {
- lastUpdateTimes.put(deviceId, System.currentTimeMillis());
- return removedRule.get();
- } else {
- return null;
- }
- }
-
- public void purgeFlowRule(NetworkId networkId, DeviceId deviceId) {
- Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
- flowEntries = getFlowEntriesByNetwork(networkId);
- flowEntries.remove(deviceId);
- }
-
- public void purgeFlowRules(NetworkId networkId) {
- Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>
- flowEntries = getFlowEntriesByNetwork(networkId);
- flowEntries.clear();
- }
- }
-
- private class InternalTableStatsListener
- implements EventuallyConsistentMapListener<NetworkId, Map<DeviceId, List<TableStatisticsEntry>>> {
-
- @Override
- public void event(EventuallyConsistentMapEvent<NetworkId, Map<DeviceId,
- List<TableStatisticsEntry>>> event) {
- //TODO: Generate an event to listeners (do we need?)
- }
- }
-}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/package-info.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/package-info.java
index 706c006..9bb2d3b 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/package-info.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Implementation of virtual network stores.
+ * Implementation of distributed virtual network store.
*/
package org.onosproject.incubator.store.virtual.impl;
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualDeviceId.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualDeviceId.java
deleted file mode 100644
index c5e0d83..0000000
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualDeviceId.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.incubator.store.virtual.impl.primitives;
-
-import org.onosproject.incubator.net.virtual.NetworkId;
-import org.onosproject.net.DeviceId;
-
-import java.util.Objects;
-
-/**
- * A wrapper class to isolate device id from other virtual networks.
- */
-public class VirtualDeviceId {
-
- NetworkId networkId;
- DeviceId deviceId;
-
- public VirtualDeviceId(NetworkId networkId, DeviceId deviceId) {
- this.networkId = networkId;
- this.deviceId = deviceId;
- }
-
- public NetworkId networkId() {
- return networkId;
- }
-
- public DeviceId deviceId() {
- return deviceId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(networkId, deviceId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (obj instanceof VirtualDeviceId) {
- VirtualDeviceId that = (VirtualDeviceId) obj;
- return this.deviceId.equals(that.deviceId) &&
- this.networkId.equals(that.networkId);
- }
- return false;
- }
-}
-
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowEntry.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowEntry.java
deleted file mode 100644
index d899b4c..0000000
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowEntry.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.incubator.store.virtual.impl.primitives;
-
-import org.onosproject.incubator.net.virtual.NetworkId;
-import org.onosproject.net.flow.FlowEntry;
-
-import java.util.Objects;
-
-/**
- * A wrapper class to encapsulate flow entry.
- */
-public class VirtualFlowEntry {
- NetworkId networkId;
- FlowEntry flowEntry;
-
- public VirtualFlowEntry(NetworkId networkId, FlowEntry flowEntry) {
- this.networkId = networkId;
- this.flowEntry = flowEntry;
- }
-
- public NetworkId networkId() {
- return networkId;
- }
-
- public FlowEntry flowEntry() {
- return flowEntry;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(networkId, flowEntry);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof VirtualFlowEntry) {
- VirtualFlowEntry that = (VirtualFlowEntry) other;
- return this.networkId.equals(that.networkId) &&
- this.flowEntry.equals(that.flowEntry);
- } else {
- return false;
- }
- }
-}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowRule.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowRule.java
deleted file mode 100644
index e5c706b..0000000
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowRule.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.incubator.store.virtual.impl.primitives;
-
-import org.onosproject.incubator.net.virtual.NetworkId;
-import org.onosproject.net.flow.FlowRule;
-
-import java.util.Objects;
-
-/**
- * A wrapper class to encapsulate flow rule.
- */
-public class VirtualFlowRule {
- NetworkId networkId;
- FlowRule rule;
-
- public VirtualFlowRule(NetworkId networkId, FlowRule rule) {
- this.networkId = networkId;
- this.rule = rule;
- }
-
- public NetworkId networkId() {
- return networkId;
- }
-
- public FlowRule rule() {
- return rule;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(networkId, rule);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof VirtualFlowRule) {
- VirtualFlowRule that = (VirtualFlowRule) other;
- return this.networkId.equals(that.networkId) &&
- this.rule.equals(that.rule);
- } else {
- return false;
- }
- }
-}
-
-
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowRuleBatchEvent.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowRuleBatchEvent.java
deleted file mode 100644
index 301c5d1..0000000
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowRuleBatchEvent.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.incubator.store.virtual.impl.primitives;
-
-import org.onosproject.incubator.net.virtual.NetworkId;
-import org.onosproject.net.flow.FlowRuleBatchEvent;
-
-import java.util.Objects;
-
-/**
- * A wrapper class to encapsulate flow rule batch event.
- */
-public class VirtualFlowRuleBatchEvent {
- NetworkId networkId;
- FlowRuleBatchEvent event;
-
- public VirtualFlowRuleBatchEvent(NetworkId networkId, FlowRuleBatchEvent event) {
- this.networkId = networkId;
- this.event = event;
- }
-
- public NetworkId networkId() {
- return networkId;
- }
-
- public FlowRuleBatchEvent event() {
- return event;
- }
-
-
- @Override
- public int hashCode() {
- return Objects.hash(networkId, event);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof VirtualFlowRuleBatchEvent) {
- VirtualFlowRuleBatchEvent that = (VirtualFlowRuleBatchEvent) other;
- return this.networkId.equals(that.networkId) &&
- this.event.equals(that.event);
- } else {
- return false;
- }
- }
-}
-
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowRuleBatchOperation.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowRuleBatchOperation.java
deleted file mode 100644
index a06597b..0000000
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/VirtualFlowRuleBatchOperation.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.incubator.store.virtual.impl.primitives;
-
-import org.onosproject.incubator.net.virtual.NetworkId;
-import org.onosproject.net.flow.FlowRuleBatchOperation;
-
-import java.util.Objects;
-
-/**
- * A wrapper class to encapsulate flow rule batch operation.
- */
-public class VirtualFlowRuleBatchOperation {
- NetworkId networkId;
- FlowRuleBatchOperation operation;
-
- public VirtualFlowRuleBatchOperation(NetworkId networkId,
- FlowRuleBatchOperation operation) {
- this.networkId = networkId;
- this.operation = operation;
- }
-
- public NetworkId networkId() {
- return networkId;
- }
-
- public FlowRuleBatchOperation operation() {
- return operation;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(networkId, operation);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other != null && other instanceof VirtualFlowRuleBatchOperation) {
- VirtualFlowRuleBatchOperation that = (VirtualFlowRuleBatchOperation) other;
- return this.networkId.equals(that.networkId) &&
- this.operation.equals(that.operation);
- } else {
- return false;
- }
- }
-}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/package-info.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/package-info.java
deleted file mode 100644
index d8253a7..0000000
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/primitives/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Implementation of distributed virtual network store primitives.
- */
-package org.onosproject.incubator.store.virtual.impl.primitives;