[ONOS-5936] (vCore) Virtual FlowObjective Manager and Store
Changes
1. FlowObjective manager for virtual network is added
2. VirtualFlowObjective store is added
3. SimpleVirtualFlowObjectiveStore is implementation
4. Unit tests are added
Change-Id: I18ff1d440d1f85ca96fff36a33a8b67566031e2c
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkFlowObjectiveStore.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkFlowObjectiveStore.java
new file mode 100644
index 0000000..7340c13
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkFlowObjectiveStore.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual;
+
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+
+import java.util.Map;
+
+/**
+ * The flow objective store for virtual networks.
+ */
+public interface VirtualNetworkFlowObjectiveStore
+ extends VirtualStore<ObjectiveEvent, FlowObjectiveStoreDelegate> {
+
+ /**
+ * Adds a NextGroup to the store, by mapping it to the nextId as key,
+ * and replacing any previous mapping.
+ *
+ * @param networkId a virtual network identifier
+ * @param nextId an integer
+ * @param group a next group opaque object
+ */
+ void putNextGroup(NetworkId networkId, Integer nextId, NextGroup group);
+
+ /**
+ * Fetch a next group from the store.
+ *
+ * @param networkId a virtual network identifier
+ * @param nextId an integer used as key
+ * @return a next group, or null if group was not found
+ */
+ NextGroup getNextGroup(NetworkId networkId, Integer nextId);
+
+ /**
+ * Remove a next group mapping from the store.
+ *
+ * @param networkId a virtual network identifier
+ * @param nextId the key to remove from the store.
+ * @return the next group which mapped to the nextId and is now removed, or
+ * null if no group mapping existed in the store
+ */
+ NextGroup removeNextGroup(NetworkId networkId, Integer nextId);
+
+ /**
+ * Fetch all groups from the store and their mapping to nextIds.
+ *
+ * @param networkId a virtual network identifier
+ * @return a map that represents the current snapshot of Next-ids to NextGroups
+ */
+ Map<Integer, NextGroup> getAllGroups(NetworkId networkId);
+
+ /**
+ * Allocates a next objective id. This id is globally unique.
+ *
+ * @param networkId a virtual network identifier
+ * @return an integer
+ */
+ int allocateNextId(NetworkId networkId);
+}
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
new file mode 100644
index 0000000..8eba5cf
--- /dev/null
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
@@ -0,0 +1,628 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.incubator.net.virtual.AbstractVnetService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.behaviour.PipelinerContext;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.GroupKey;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provides implementation of the flow objective programming service for virtual networks.
+ */
+// NOTE: This manager is designed to provide flow objective programming service
+// for virtual networks. Actually, virtual networks don't need to consider
+// the different implementation of data-path pipeline. But, the interfaces
+// and usages of flow objective service are still valuable for virtual network.
+// This manager is working as an interpreter from FlowObjective to FlowRules
+// to provide symmetric interfaces with ONOS core services.
+// The behaviours are based on DefaultSingleTablePipeline.
+
+public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
+ implements FlowObjectiveService {
+
+ public static final int INSTALL_RETRY_ATTEMPTS = 5;
+ public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
+
+ private final Logger log = getLogger(getClass());
+
+ protected DeviceService deviceService;
+
+ // Note: The following dependencies are added on behalf of the pipeline
+ // driver behaviours to assure these services are available for their
+ // initialization.
+ protected FlowRuleService flowRuleService;
+
+ protected VirtualNetworkFlowObjectiveStore virtualFlowObjectiveStore;
+ protected FlowObjectiveStore flowObjectiveStore;
+ private final FlowObjectiveStoreDelegate delegate;
+
+ private final PipelinerContext context = new InnerPipelineContext();
+
+ private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
+ private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
+
+ // local store to track which nextObjectives were sent to which device
+ // for debugging purposes
+ private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
+
+ private ExecutorService executorService;
+
+ public VirtualNetworkFlowObjectiveManager(VirtualNetworkService manager,
+ NetworkId networkId) {
+ super(manager, networkId);
+
+ deviceService = manager.get(networkId(), DeviceService.class);
+ flowRuleService = manager.get(networkId(), FlowRuleService.class);
+
+ executorService = newFixedThreadPool(4, groupedThreads("onos/virtual/objective-installer", "%d", log));
+
+ virtualFlowObjectiveStore =
+ serviceDirectory.get(VirtualNetworkFlowObjectiveStore.class);
+ delegate = new InternalStoreDelegate();
+ virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
+ flowObjectiveStore = new StoreConvertor();
+ }
+
+ @Override
+ public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
+ executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
+ }
+
+ @Override
+ public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
+ if (queueObjective(deviceId, forwardingObjective)) {
+ return;
+ }
+ executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
+ }
+
+ @Override
+ public void next(DeviceId deviceId, NextObjective nextObjective) {
+ nextToDevice.put(nextObjective.id(), deviceId);
+ executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+ }
+
+ @Override
+ public int allocateNextId() {
+ return flowObjectiveStore.allocateNextId();
+ }
+
+ @Override
+ public void initPolicy(String policy) {
+
+ }
+
+ @Override
+ public List<String> getNextMappings() {
+ List<String> mappings = new ArrayList<>();
+ Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
+ // XXX if the NextGroup after de-serialization actually stored info of the deviceId
+ // then info on any nextObj could be retrieved from one controller instance.
+ // Right now the drivers on one instance can only fetch for next-ids that came
+ // to them.
+ // Also, we still need to send the right next-id to the right driver as potentially
+ // there can be different drivers for different devices. But on that account,
+ // no instance should be decoding for another instance's nextIds.
+
+ for (Map.Entry<Integer, NextGroup> e : allnexts.entrySet()) {
+ // get the device this next Objective was sent to
+ DeviceId deviceId = nextToDevice.get(e.getKey());
+ mappings.add("NextId " + e.getKey() + ": " +
+ ((deviceId != null) ? deviceId : "nextId not in this onos instance"));
+ if (deviceId != null) {
+ // this instance of the controller sent the nextObj to a driver
+ Pipeliner pipeliner = getDevicePipeliner(deviceId);
+ List<String> nextMappings = pipeliner.getNextMappings(e.getValue());
+ if (nextMappings != null) {
+ mappings.addAll(nextMappings);
+ }
+ }
+ }
+ return mappings;
+ }
+
+ @Override
+ public List<String> getPendingNexts() {
+ List<String> pendingNexts = new ArrayList<>();
+ for (Integer nextId : pendingForwards.keySet()) {
+ Set<PendingNext> pnext = pendingForwards.get(nextId);
+ StringBuilder pend = new StringBuilder();
+ pend.append("Next Id: ").append(Integer.toString(nextId))
+ .append(" :: ");
+ for (PendingNext pn : pnext) {
+ pend.append(Integer.toString(pn.forwardingObjective().id()))
+ .append(" ");
+ }
+ pendingNexts.add(pend.toString());
+ }
+ return pendingNexts;
+ }
+
+ private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
+ if (fwd.nextId() == null ||
+ flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
+ // fast path
+ return false;
+ }
+ boolean queued = false;
+ synchronized (pendingForwards) {
+ // double check the flow objective store, because this block could run
+ // after a notification arrives
+ if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
+ pendingForwards.compute(fwd.nextId(), (id, pending) -> {
+ PendingNext next = new PendingNext(deviceId, fwd);
+ if (pending == null) {
+ return Sets.newHashSet(next);
+ } else {
+ pending.add(next);
+ return pending;
+ }
+ });
+ queued = true;
+ }
+ }
+ if (queued) {
+ log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
+ fwd.id(), fwd.nextId(), deviceId);
+ }
+ return queued;
+ }
+
+ /**
+ * Task that passes the flow objective down to the driver. The task will
+ * make a few attempts to find the appropriate driver, then eventually give
+ * up and report an error if no suitable driver could be found.
+ */
+ private class ObjectiveInstaller implements Runnable {
+ private final DeviceId deviceId;
+ private final Objective objective;
+
+ private final int numAttempts;
+
+ public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
+ this(deviceId, objective, 1);
+ }
+
+ public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
+ this.deviceId = checkNotNull(deviceId);
+ this.objective = checkNotNull(objective);
+ this.numAttempts = checkNotNull(attemps);
+ }
+
+ @Override
+ public void run() {
+ try {
+ Pipeliner pipeliner = getDevicePipeliner(deviceId);
+
+ if (pipeliner != null) {
+ if (objective instanceof NextObjective) {
+ pipeliner.next((NextObjective) objective);
+ } else if (objective instanceof ForwardingObjective) {
+ pipeliner.forward((ForwardingObjective) objective);
+ } else {
+ pipeliner.filter((FilteringObjective) objective);
+ }
+ //Attempts to check if pipeliner is null for retry attempts
+ } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
+ Thread.sleep(INSTALL_RETRY_INTERVAL);
+ executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
+ } else {
+ // Otherwise we've tried a few times and failed, report an
+ // error back to the user.
+ objective.context().ifPresent(
+ c -> c.onError(objective, ObjectiveError.NOPIPELINER));
+ }
+ //Excpetion thrown
+ } catch (Exception e) {
+ log.warn("Exception while installing flow objective", e);
+ }
+ }
+ }
+
+ private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
+ @Override
+ public void notify(ObjectiveEvent event) {
+ if (event.type() == ObjectiveEvent.Type.ADD) {
+ log.debug("Received notification of obj event {}", event);
+ Set<PendingNext> pending;
+ synchronized (pendingForwards) {
+ // needs to be synchronized for queueObjective lookup
+ pending = pendingForwards.remove(event.subject());
+ }
+
+ if (pending == null) {
+ log.debug("Nothing pending for this obj event {}", event);
+ return;
+ }
+
+ log.debug("Processing {} pending forwarding objectives for nextId {}",
+ pending.size(), event.subject());
+ pending.forEach(p -> getDevicePipeliner(p.deviceId())
+ .forward(p.forwardingObjective()));
+ }
+ }
+ }
+
+ /**
+ * Retrieves (if it exists) the device pipeline behaviour from the cache.
+ * Otherwise it warms the caches and triggers the init method of the Pipeline.
+ * For virtual network, it returns OVS pipeliner.
+ *
+ * @param deviceId the id of the device associated to the pipeline
+ * @return the implementation of the Pipeliner behaviour
+ */
+ private Pipeliner getDevicePipeliner(DeviceId deviceId) {
+ return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
+ }
+
+ /**
+ * Creates and initialize {@link Pipeliner}.
+ * <p>
+ * Note: Expected to be called under per-Device lock.
+ * e.g., {@code pipeliners}' Map#compute family methods
+ *
+ * @param deviceId Device to initialize pipeliner
+ * @return {@link Pipeliner} instance or null
+ */
+ private Pipeliner initPipelineHandler(DeviceId deviceId) {
+ //FIXME: do we need a standard pipeline for virtual device?
+ Pipeliner pipeliner = new DefaultVirtualDevicePipeline();
+ pipeliner.init(deviceId, context);
+ return pipeliner;
+ }
+
+ // Processing context for initializing pipeline driver behaviours.
+ private class InnerPipelineContext implements PipelinerContext {
+ public ServiceDirectory directory() {
+ return serviceDirectory;
+ }
+
+ public FlowObjectiveStore store() {
+ return flowObjectiveStore;
+ }
+ }
+
+ /**
+ * Data class used to hold a pending forwarding objective that could not
+ * be processed because the associated next object was not present.
+ */
+ private class PendingNext {
+ private final DeviceId deviceId;
+ private final ForwardingObjective fwd;
+
+ public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+ this.deviceId = deviceId;
+ this.fwd = fwd;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ForwardingObjective forwardingObjective() {
+ return fwd;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(deviceId, fwd);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof PendingNext)) {
+ return false;
+ }
+ final PendingNext other = (PendingNext) obj;
+ if (this.deviceId.equals(other.deviceId) &&
+ this.fwd.equals(other.fwd)) {
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * This class is a wrapping class from VirtualNetworkFlowObjectiveStore
+ * to FlowObjectiveStore for PipelinerContext.
+ */
+ private class StoreConvertor implements FlowObjectiveStore {
+
+ @Override
+ public void setDelegate(FlowObjectiveStoreDelegate delegate) {
+ virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
+ }
+
+ @Override
+ public void unsetDelegate(FlowObjectiveStoreDelegate delegate) {
+ virtualFlowObjectiveStore.unsetDelegate(networkId(), delegate);
+ }
+
+ @Override
+ public boolean hasDelegate() {
+ return virtualFlowObjectiveStore.hasDelegate(networkId());
+ }
+
+ @Override
+ public void putNextGroup(Integer nextId, NextGroup group) {
+ virtualFlowObjectiveStore.putNextGroup(networkId(), nextId, group);
+ }
+
+ @Override
+ public NextGroup getNextGroup(Integer nextId) {
+ return virtualFlowObjectiveStore.getNextGroup(networkId(), nextId);
+ }
+
+ @Override
+ public NextGroup removeNextGroup(Integer nextId) {
+ return virtualFlowObjectiveStore.removeNextGroup(networkId(), nextId);
+ }
+
+ @Override
+ public Map<Integer, NextGroup> getAllGroups() {
+ return virtualFlowObjectiveStore.getAllGroups(networkId());
+ }
+
+ @Override
+ public int allocateNextId() {
+ return virtualFlowObjectiveStore.allocateNextId(networkId());
+ }
+ }
+
+ /**
+ * Simple single table pipeline abstraction for virtual networks.
+ */
+ private class DefaultVirtualDevicePipeline
+ extends AbstractHandlerBehaviour implements Pipeliner {
+
+ private final Logger log = getLogger(getClass());
+
+ private DeviceId deviceId;
+
+ private Cache<Integer, NextObjective> pendingNext;
+
+ private KryoNamespace appKryo = new KryoNamespace.Builder()
+ .register(GroupKey.class)
+ .register(DefaultGroupKey.class)
+ .register(SingleGroup.class)
+ .register(byte[].class)
+ .build("DefaultVirtualDevicePipeline");
+
+ @Override
+ public void init(DeviceId deviceId, PipelinerContext context) {
+ this.deviceId = deviceId;
+
+ pendingNext = CacheBuilder.newBuilder()
+ .expireAfterWrite(20, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ notification.getValue().context()
+ .ifPresent(c -> c.onError(notification.getValue(),
+ ObjectiveError.FLOWINSTALLATIONFAILED));
+ }
+ }).build();
+ }
+
+ @Override
+ public void filter(FilteringObjective filter) {
+
+ TrafficTreatment.Builder actions;
+ switch (filter.type()) {
+ case PERMIT:
+ actions = (filter.meta() == null) ?
+ DefaultTrafficTreatment.builder().punt() :
+ DefaultTrafficTreatment.builder(filter.meta());
+ break;
+ case DENY:
+ actions = (filter.meta() == null) ?
+ DefaultTrafficTreatment.builder() :
+ DefaultTrafficTreatment.builder(filter.meta());
+ actions.drop();
+ break;
+ default:
+ log.warn("Unknown filter type: {}", filter.type());
+ actions = DefaultTrafficTreatment.builder().drop();
+ }
+
+ TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
+
+ filter.conditions().forEach(selector::add);
+
+ if (filter.key() != null) {
+ selector.add(filter.key());
+ }
+
+ FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withSelector(selector.build())
+ .withTreatment(actions.build())
+ .fromApp(filter.appId())
+ .withPriority(filter.priority());
+
+ if (filter.permanent()) {
+ ruleBuilder.makePermanent();
+ } else {
+ ruleBuilder.makeTemporary(filter.timeout());
+ }
+
+ installObjective(ruleBuilder, filter);
+ }
+
+ @Override
+ public void forward(ForwardingObjective fwd) {
+ TrafficSelector selector = fwd.selector();
+
+ if (fwd.treatment() != null) {
+ // Deal with SPECIFIC and VERSATILE in the same manner.
+ FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withSelector(selector)
+ .fromApp(fwd.appId())
+ .withPriority(fwd.priority())
+ .withTreatment(fwd.treatment());
+
+ if (fwd.permanent()) {
+ ruleBuilder.makePermanent();
+ } else {
+ ruleBuilder.makeTemporary(fwd.timeout());
+ }
+ installObjective(ruleBuilder, fwd);
+
+ } else {
+ NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
+ if (nextObjective != null) {
+ pendingNext.invalidate(fwd.nextId());
+ nextObjective.next().forEach(treat -> {
+ FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withSelector(selector)
+ .fromApp(fwd.appId())
+ .withPriority(fwd.priority())
+ .withTreatment(treat);
+
+ if (fwd.permanent()) {
+ ruleBuilder.makePermanent();
+ } else {
+ ruleBuilder.makeTemporary(fwd.timeout());
+ }
+ installObjective(ruleBuilder, fwd);
+ });
+ } else {
+ fwd.context().ifPresent(c -> c.onError(fwd,
+ ObjectiveError.GROUPMISSING));
+ }
+ }
+ }
+
+ private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
+ FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
+ switch (objective.op()) {
+
+ case ADD:
+ flowBuilder.add(ruleBuilder.build());
+ break;
+ case REMOVE:
+ flowBuilder.remove(ruleBuilder.build());
+ break;
+ default:
+ log.warn("Unknown operation {}", objective.op());
+ }
+
+ flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ objective.context().ifPresent(context -> context.onSuccess(objective));
+ }
+
+ @Override
+ public void onError(FlowRuleOperations ops) {
+ objective.context()
+ .ifPresent(context ->
+ context.onError(objective,
+ ObjectiveError.FLOWINSTALLATIONFAILED));
+ }
+ }));
+ }
+
+ @Override
+ public void next(NextObjective nextObjective) {
+
+ pendingNext.put(nextObjective.id(), nextObjective);
+ flowObjectiveStore.putNextGroup(nextObjective.id(),
+ new SingleGroup(
+ new DefaultGroupKey(
+ appKryo.serialize(nextObjective.id()))));
+ nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
+ }
+
+ @Override
+ public List<String> getNextMappings(NextGroup nextGroup) {
+ // Default single table pipeline does not use nextObjectives or groups
+ return null;
+ }
+
+ private class SingleGroup implements NextGroup {
+
+ private final GroupKey key;
+
+ public SingleGroup(GroupKey key) {
+ this.key = key;
+ }
+
+ public GroupKey key() {
+ return key;
+ }
+
+ @Override
+ public byte[] data() {
+ return appKryo.serialize(key);
+ }
+ }
+ }
+}
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
index d50cc47..b1316a1 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
@@ -58,6 +58,7 @@
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.group.GroupService;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentListener;
@@ -425,6 +426,8 @@
service = new VirtualNetworkPacketManager(this, network.id());
} else if (serviceKey.serviceClass.equals(GroupService.class)) {
service = new VirtualNetworkGroupManager(this, network.id());
+ } else if (serviceKey.serviceClass.equals(FlowObjectiveService.class)) {
+ service = new VirtualNetworkFlowObjectiveManager(this, network.id());
} else {
return null;
}
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManagerTest.java
new file mode 100644
index 0000000..2ca243b
--- /dev/null
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManagerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual.impl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestTools;
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.osgi.TestServiceDirectory;
+import org.onosproject.TestApplicationId;
+import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetwork;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkStore;
+import org.onosproject.incubator.net.virtual.event.VirtualEvent;
+import org.onosproject.incubator.net.virtual.event.VirtualListenerRegistryManager;
+import org.onosproject.incubator.net.virtual.impl.provider.VirtualProviderManager;
+import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
+import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowObjectiveStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowRuleStore;
+import org.onosproject.net.NetTestTools;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.intent.FakeIntentManager;
+import org.onosproject.net.intent.TestableIntentService;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.TestStorageService;
+
+import static org.junit.Assert.*;
+
+/**
+ * Junit tests for VirtualNetworkFlowObjectiveManager.
+ */
+public class VirtualNetworkFlowObjectiveManagerTest
+ extends VirtualNetworkTestUtil {
+
+ private static final int RETRY_MS = 250;
+
+ private VirtualNetworkManager manager;
+ private DistributedVirtualNetworkStore virtualNetworkManagerStore;
+ private TestableIntentService intentService = new FakeIntentManager();
+ private ServiceDirectory testDirectory;
+ private SimpleVirtualFlowObjectiveStore flowObjectiveStore;
+
+ private VirtualProviderManager providerRegistryService;
+ private EventDeliveryService eventDeliveryService;
+ VirtualListenerRegistryManager listenerRegistryManager =
+ VirtualListenerRegistryManager.getInstance();
+
+ private ApplicationId appId;
+
+ private VirtualNetwork vnet1;
+ private VirtualNetwork vnet2;
+
+ private FlowObjectiveService service1;
+ private FlowObjectiveService service2;
+
+ //FIXME: referring flowrule service, store, and provider shouldn't be here
+ private VirtualFlowRuleProvider flowRuleProvider = new TestProvider();
+ private SimpleVirtualFlowRuleStore flowRuleStore;
+
+ @Before
+ public void setUp() throws Exception {
+ virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
+
+ CoreService coreService = new TestCoreService();
+ TestUtils.setField(virtualNetworkManagerStore, "coreService", coreService);
+ StorageService storageService = new TestStorageService();
+ TestUtils.setField(virtualNetworkManagerStore, "storageService", storageService);
+ virtualNetworkManagerStore.activate();
+
+ flowObjectiveStore = new SimpleVirtualFlowObjectiveStore();
+ TestUtils.setField(flowObjectiveStore, "storageService", storageService);
+ flowObjectiveStore.activate();
+ flowRuleStore = new SimpleVirtualFlowRuleStore();
+ flowRuleStore.activate();
+
+ manager = new VirtualNetworkManager();
+ manager.store = virtualNetworkManagerStore;
+ manager.intentService = intentService;
+ TestUtils.setField(manager, "coreService", coreService);
+
+ providerRegistryService = new VirtualProviderManager();
+ providerRegistryService.registerProvider(flowRuleProvider);
+
+ eventDeliveryService = new TestEventDispatcher();
+ NetTestTools.injectEventDispatcher(manager, eventDeliveryService);
+ eventDeliveryService.addSink(VirtualEvent.class, listenerRegistryManager);
+
+ appId = new TestApplicationId("FlowRuleManagerTest");
+
+ testDirectory = new TestServiceDirectory()
+ .add(VirtualNetworkStore.class, virtualNetworkManagerStore)
+ .add(CoreService.class, coreService)
+ .add(EventDeliveryService.class, eventDeliveryService)
+ .add(VirtualProviderRegistryService.class, providerRegistryService)
+ .add(VirtualNetworkFlowRuleStore.class, flowRuleStore)
+ .add(VirtualNetworkFlowObjectiveStore.class, flowObjectiveStore);
+ TestUtils.setField(manager, "serviceDirectory", testDirectory);
+
+ manager.activate();
+
+ vnet1 = setupVirtualNetworkTopology(manager, TID1);
+ vnet2 = setupVirtualNetworkTopology(manager, TID2);
+
+ service1 = new VirtualNetworkFlowObjectiveManager(manager, vnet1.id());
+ service2 = new VirtualNetworkFlowObjectiveManager(manager, vnet2.id());
+ }
+
+ @After
+ public void tearDownTest() {
+ manager.deactivate();
+ virtualNetworkManagerStore.deactivate();
+ }
+
+ /**
+ * Tests adding a forwarding objective.
+ */
+ @Test
+ public void forwardingObjective() {
+ TrafficSelector selector = DefaultTrafficSelector.emptySelector();
+ TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
+ ForwardingObjective forward =
+ DefaultForwardingObjective.builder()
+ .fromApp(NetTestTools.APP_ID)
+ .withFlag(ForwardingObjective.Flag.SPECIFIC)
+ .withSelector(selector)
+ .withTreatment(treatment)
+ .makePermanent()
+ .add();
+
+ service1.forward(VDID1, forward);
+
+ TestTools.assertAfter(RETRY_MS, () ->
+ assertEquals("1 flowrule entry expected",
+ 1, flowRuleStore.getFlowRuleCount(vnet1.id())));
+ TestTools.assertAfter(RETRY_MS, () ->
+ assertEquals("0 flowrule entry expected",
+ 0, flowRuleStore.getFlowRuleCount(vnet2.id())));
+ }
+
+ //TODO: More test cases for filter, foward, and next
+
+ private class TestProvider extends AbstractVirtualProvider
+ implements VirtualFlowRuleProvider {
+
+ protected TestProvider() {
+ super(new ProviderId("test", "org.onosproject.virtual.testprovider"));
+ }
+
+ @Override
+ public void applyFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+ }
+
+ @Override
+ public void removeFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+ }
+
+ @Override
+ public void executeBatch(NetworkId networkId, FlowRuleBatchOperation batch) {
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
index 3cf0457..2d125d1 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
@@ -40,6 +40,7 @@
import org.onosproject.incubator.net.virtual.VirtualLink;
import org.onosproject.incubator.net.virtual.VirtualNetwork;
import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
import org.onosproject.incubator.net.virtual.VirtualNetworkGroupStore;
import org.onosproject.incubator.net.virtual.VirtualNetworkIntent;
@@ -54,6 +55,7 @@
import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderService;
import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowObjectiveStore;
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowRuleStore;
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualGroupStore;
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualPacketStore;
@@ -833,7 +835,8 @@
.add(ClusterService.class, new ClusterServiceAdapter())
.add(VirtualNetworkFlowRuleStore.class, new SimpleVirtualFlowRuleStore())
.add(VirtualNetworkPacketStore.class, new SimpleVirtualPacketStore())
- .add(VirtualNetworkGroupStore.class, new SimpleVirtualGroupStore());
+ .add(VirtualNetworkGroupStore.class, new SimpleVirtualGroupStore())
+ .add(VirtualNetworkFlowObjectiveStore.class, new SimpleVirtualFlowObjectiveStore());
validateServiceGetReturnsSavedInstance(virtualNetwork.id(), FlowRuleService.class);
validateServiceGetReturnsSavedInstance(virtualNetwork.id(), PacketService.class);
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
index 1a49e22..1cedd4a 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
@@ -33,13 +33,18 @@
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualDevice;
import org.onosproject.incubator.net.virtual.VirtualNetwork;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
import org.onosproject.incubator.net.virtual.VirtualNetworkStore;
import org.onosproject.incubator.net.virtual.impl.provider.VirtualProviderManager;
import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
import org.onosproject.incubator.net.virtual.provider.VirtualPacketProvider;
import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowObjectiveStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowRuleStore;
import org.onosproject.incubator.store.virtual.impl.SimpleVirtualPacketStore;
import org.onosproject.net.DeviceId;
import org.onosproject.net.NetTestTools;
@@ -49,6 +54,8 @@
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.intent.FakeIntentManager;
import org.onosproject.net.intent.TestableIntentService;
import org.onosproject.net.packet.DefaultOutboundPacket;
@@ -57,6 +64,7 @@
import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TestStorageService;
import java.nio.ByteBuffer;
@@ -95,12 +103,17 @@
private ApplicationId appId = new TestApplicationId("VirtualPacketManagerTest");
+ private VirtualFlowRuleProvider flowRuleProvider = new TestFlowRuleProvider();
+ private SimpleVirtualFlowRuleStore flowRuleStore;
+ private SimpleVirtualFlowObjectiveStore flowObjectiveStore;
+
@Before
public void setUp() throws TestUtils.TestUtilsException {
virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
TestUtils.setField(virtualNetworkManagerStore, "coreService", coreService);
- TestUtils.setField(virtualNetworkManagerStore, "storageService", new TestStorageService());
+ StorageService storageService = new TestStorageService();
+ TestUtils.setField(virtualNetworkManagerStore, "storageService", storageService);
virtualNetworkManagerStore.activate();
manager = new VirtualNetworkManager();
@@ -109,8 +122,15 @@
manager.intentService = intentService;
NetTestTools.injectEventDispatcher(manager, new TestEventDispatcher());
+ flowObjectiveStore = new SimpleVirtualFlowObjectiveStore();
+ TestUtils.setField(flowObjectiveStore, "storageService", storageService);
+ flowObjectiveStore.activate();
+ flowRuleStore = new SimpleVirtualFlowRuleStore();
+ flowRuleStore.activate();
+
providerRegistryService = new VirtualProviderManager();
providerRegistryService.registerProvider(provider);
+ providerRegistryService.registerProvider(flowRuleProvider);
testDirectory = new TestServiceDirectory()
.add(VirtualNetworkStore.class, virtualNetworkManagerStore)
@@ -118,6 +138,8 @@
.add(VirtualProviderRegistryService.class, providerRegistryService)
.add(EventDeliveryService.class, eventDeliveryService)
.add(ClusterService.class, new ClusterServiceAdapter())
+ .add(VirtualNetworkFlowRuleStore.class, flowRuleStore)
+ .add(VirtualNetworkFlowObjectiveStore.class, flowObjectiveStore)
.add(VirtualNetworkPacketStore.class, packetStore);
TestUtils.setField(manager, "serviceDirectory", testDirectory);
@@ -373,4 +395,27 @@
return false;
}
}
+
+ private class TestFlowRuleProvider extends AbstractVirtualProvider
+ implements VirtualFlowRuleProvider {
+
+ protected TestFlowRuleProvider() {
+ super(new ProviderId("test", "org.onosproject.virtual.testprovider"));
+ }
+
+ @Override
+ public void applyFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+ }
+
+ @Override
+ public void removeFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+ }
+
+ @Override
+ public void executeBatch(NetworkId networkId, FlowRuleBatchOperation batch) {
+
+ }
+ }
}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowObjectiveStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowObjectiveStore.java
new file mode 100644
index 0000000..1ac5396
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowObjectiveStore.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.virtual.impl;
+
+import com.google.common.collect.Maps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
+import org.onosproject.net.behaviour.DefaultNextGroup;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Single instance implementation of store to manage
+ * the inventory of created next groups for virtual network.
+ */
+@Component(immediate = true)
+@Service
+public class SimpleVirtualFlowObjectiveStore
+ extends AbstractVirtualStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
+ implements VirtualNetworkFlowObjectiveStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private ConcurrentMap<NetworkId, ConcurrentMap<Integer, byte[]>> nextGroupsMap;
+
+ private AtomicCounter nextIds;
+
+ // event queue to separate map-listener threads from event-handler threads (tpool)
+ private BlockingQueue<VirtualObjectiveEvent> eventQ;
+ private ExecutorService tpool;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Activate
+ public void activate() {
+ tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/virtual/flobj-notifier", "%d", log));
+ eventQ = new LinkedBlockingQueue<>();
+ tpool.execute(new FlowObjectiveNotifier());
+
+ nextGroupsMap = Maps.newConcurrentMap();
+
+ nextIds = storageService.getAtomicCounter("next-objective-counter");
+ log.info("Started");
+ }
+
+ private ConcurrentMap<Integer, byte[]> getNextGroups(NetworkId networkId) {
+ nextGroupsMap.computeIfAbsent(networkId, n -> Maps.newConcurrentMap());
+ return nextGroupsMap.get(networkId);
+ }
+
+ @Override
+ public void putNextGroup(NetworkId networkId, Integer nextId, NextGroup group) {
+ ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+ nextGroups.put(nextId, group.data());
+
+ eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.ADD, nextId));
+ }
+
+ @Override
+ public NextGroup getNextGroup(NetworkId networkId, Integer nextId) {
+ ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+ return new DefaultNextGroup(nextGroups.get(nextId));
+ }
+
+ @Override
+ public NextGroup removeNextGroup(NetworkId networkId, Integer nextId) {
+ ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+ eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.REMOVE, nextId));
+ return new DefaultNextGroup(nextGroups.remove(nextId));
+ }
+
+ @Override
+ public Map<Integer, NextGroup> getAllGroups(NetworkId networkId) {
+ ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+
+ Map<Integer, NextGroup> nextGroupMappings = new HashMap<>();
+ for (int key : nextGroups.keySet()) {
+ NextGroup nextGroup = getNextGroup(networkId, key);
+ if (nextGroup != null) {
+ nextGroupMappings.put(key, nextGroup);
+ }
+ }
+ return nextGroupMappings;
+ }
+
+ @Override
+ public int allocateNextId(NetworkId networkId) {
+ return (int) nextIds.incrementAndGet();
+ }
+
+ private class FlowObjectiveNotifier implements Runnable {
+ @Override
+ public void run() {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ VirtualObjectiveEvent vEvent = eventQ.take();
+ notifyDelegate(vEvent.networkId(), vEvent);
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private class VirtualObjectiveEvent extends ObjectiveEvent {
+ NetworkId networkId;
+
+ public VirtualObjectiveEvent(NetworkId networkId, Type type,
+ Integer objective) {
+ super(type, objective);
+ this.networkId = networkId;
+ }
+
+ NetworkId networkId() {
+ return networkId;
+ }
+ }
+}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java
index 5f71b57..0952574 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java
@@ -151,8 +151,12 @@
@Override
public int getFlowRuleCount(NetworkId networkId) {
-
int sum = 0;
+
+ if (flowEntries.get(networkId) == null) {
+ return 0;
+ }
+
for (ConcurrentMap<FlowId, List<StoredFlowEntry>> ft :
flowEntries.get(networkId).values()) {
for (List<StoredFlowEntry> fes : ft.values()) {