[ONOS-5936] (vCore) Virtual FlowObjective Manager and Store

Changes
1. FlowObjective manager for virtual network is added
2. VirtualFlowObjective store is added
3. SimpleVirtualFlowObjectiveStore is implementation
4. Unit tests are added

Change-Id: I18ff1d440d1f85ca96fff36a33a8b67566031e2c
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkFlowObjectiveStore.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkFlowObjectiveStore.java
new file mode 100644
index 0000000..7340c13
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkFlowObjectiveStore.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual;
+
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+
+import java.util.Map;
+
+/**
+ * The flow objective store for virtual networks.
+ */
+public interface VirtualNetworkFlowObjectiveStore
+        extends VirtualStore<ObjectiveEvent, FlowObjectiveStoreDelegate> {
+
+    /**
+     * Adds a NextGroup to the store, by mapping it to the nextId as key,
+     * and replacing any previous mapping.
+     *
+     * @param networkId a virtual network identifier
+     * @param nextId an integer
+     * @param group a next group opaque object
+     */
+    void putNextGroup(NetworkId networkId, Integer nextId, NextGroup group);
+
+    /**
+     * Fetch a next group from the store.
+     *
+     * @param networkId a virtual network identifier
+     * @param nextId an integer used as key
+     * @return a next group, or null if group was not found
+     */
+    NextGroup getNextGroup(NetworkId networkId, Integer nextId);
+
+    /**
+     * Remove a next group mapping from the store.
+     *
+     * @param networkId a virtual network identifier
+     * @param nextId  the key to remove from the store.
+     * @return the next group which mapped to the nextId and is now removed, or
+     *          null if no group mapping existed in the store
+     */
+    NextGroup removeNextGroup(NetworkId networkId, Integer nextId);
+
+    /**
+     * Fetch all groups from the store and their mapping to nextIds.
+     *
+     * @param networkId a virtual network identifier
+     * @return a map that represents the current snapshot of Next-ids to NextGroups
+     */
+    Map<Integer, NextGroup> getAllGroups(NetworkId networkId);
+
+    /**
+     * Allocates a next objective id. This id is globally unique.
+     *
+     * @param networkId a virtual network identifier
+     * @return an integer
+     */
+    int allocateNextId(NetworkId networkId);
+}
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
new file mode 100644
index 0000000..8eba5cf
--- /dev/null
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManager.java
@@ -0,0 +1,628 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual.impl;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.incubator.net.virtual.AbstractVnetService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.behaviour.Pipeliner;
+import org.onosproject.net.behaviour.PipelinerContext;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.GroupKey;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provides implementation of the flow objective programming service for virtual networks.
+ */
+// NOTE: This manager is designed to provide flow objective programming service
+// for virtual networks. Actually, virtual networks don't need to consider
+// the different implementation of data-path pipeline. But, the interfaces
+// and usages of flow objective service are still valuable for virtual network.
+// This manager is working as an interpreter from FlowObjective to FlowRules
+// to provide symmetric interfaces with ONOS core services.
+// The behaviours are based on DefaultSingleTablePipeline.
+
+public class VirtualNetworkFlowObjectiveManager extends AbstractVnetService
+        implements FlowObjectiveService {
+
+    public static final int INSTALL_RETRY_ATTEMPTS = 5;
+    public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
+
+    private final Logger log = getLogger(getClass());
+
+    protected DeviceService deviceService;
+
+    // Note: The following dependencies are added on behalf of the pipeline
+    // driver behaviours to assure these services are available for their
+    // initialization.
+    protected FlowRuleService flowRuleService;
+
+    protected VirtualNetworkFlowObjectiveStore virtualFlowObjectiveStore;
+    protected FlowObjectiveStore flowObjectiveStore;
+    private final FlowObjectiveStoreDelegate delegate;
+
+    private final PipelinerContext context = new InnerPipelineContext();
+
+    private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
+    private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
+
+    // local store to track which nextObjectives were sent to which device
+    // for debugging purposes
+    private Map<Integer, DeviceId> nextToDevice = Maps.newConcurrentMap();
+
+    private ExecutorService executorService;
+
+    public VirtualNetworkFlowObjectiveManager(VirtualNetworkService manager,
+                                              NetworkId networkId) {
+        super(manager, networkId);
+
+        deviceService = manager.get(networkId(), DeviceService.class);
+        flowRuleService = manager.get(networkId(), FlowRuleService.class);
+
+        executorService = newFixedThreadPool(4, groupedThreads("onos/virtual/objective-installer", "%d", log));
+
+        virtualFlowObjectiveStore =
+                serviceDirectory.get(VirtualNetworkFlowObjectiveStore.class);
+        delegate = new InternalStoreDelegate();
+        virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
+        flowObjectiveStore = new StoreConvertor();
+    }
+
+    @Override
+    public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
+        executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
+    }
+
+    @Override
+    public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
+        if (queueObjective(deviceId, forwardingObjective)) {
+            return;
+        }
+        executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
+    }
+
+    @Override
+    public void next(DeviceId deviceId, NextObjective nextObjective) {
+        nextToDevice.put(nextObjective.id(), deviceId);
+        executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
+    }
+
+    @Override
+    public int allocateNextId() {
+        return flowObjectiveStore.allocateNextId();
+    }
+
+    @Override
+    public void initPolicy(String policy) {
+
+    }
+
+    @Override
+    public List<String> getNextMappings() {
+        List<String> mappings = new ArrayList<>();
+        Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
+        // XXX if the NextGroup after de-serialization actually stored info of the deviceId
+        // then info on any nextObj could be retrieved from one controller instance.
+        // Right now the drivers on one instance can only fetch for next-ids that came
+        // to them.
+        // Also, we still need to send the right next-id to the right driver as potentially
+        // there can be different drivers for different devices. But on that account,
+        // no instance should be decoding for another instance's nextIds.
+
+        for (Map.Entry<Integer, NextGroup> e : allnexts.entrySet()) {
+            // get the device this next Objective was sent to
+            DeviceId deviceId = nextToDevice.get(e.getKey());
+            mappings.add("NextId " + e.getKey() + ": " +
+                                 ((deviceId != null) ? deviceId : "nextId not in this onos instance"));
+            if (deviceId != null) {
+                // this instance of the controller sent the nextObj to a driver
+                Pipeliner pipeliner = getDevicePipeliner(deviceId);
+                List<String> nextMappings = pipeliner.getNextMappings(e.getValue());
+                if (nextMappings != null) {
+                    mappings.addAll(nextMappings);
+                }
+            }
+        }
+        return mappings;
+    }
+
+    @Override
+    public List<String> getPendingNexts() {
+        List<String> pendingNexts = new ArrayList<>();
+        for (Integer nextId : pendingForwards.keySet()) {
+            Set<PendingNext> pnext = pendingForwards.get(nextId);
+            StringBuilder pend = new StringBuilder();
+            pend.append("Next Id: ").append(Integer.toString(nextId))
+                    .append(" :: ");
+            for (PendingNext pn : pnext) {
+                pend.append(Integer.toString(pn.forwardingObjective().id()))
+                        .append(" ");
+            }
+            pendingNexts.add(pend.toString());
+        }
+        return pendingNexts;
+    }
+
+    private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
+        if (fwd.nextId() == null ||
+                flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
+            // fast path
+            return false;
+        }
+        boolean queued = false;
+        synchronized (pendingForwards) {
+            // double check the flow objective store, because this block could run
+            // after a notification arrives
+            if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
+                pendingForwards.compute(fwd.nextId(), (id, pending) -> {
+                    PendingNext next = new PendingNext(deviceId, fwd);
+                    if (pending == null) {
+                        return Sets.newHashSet(next);
+                    } else {
+                        pending.add(next);
+                        return pending;
+                    }
+                });
+                queued = true;
+            }
+        }
+        if (queued) {
+            log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
+                      fwd.id(), fwd.nextId(), deviceId);
+        }
+        return queued;
+    }
+
+    /**
+     * Task that passes the flow objective down to the driver. The task will
+     * make a few attempts to find the appropriate driver, then eventually give
+     * up and report an error if no suitable driver could be found.
+     */
+    private class ObjectiveInstaller implements Runnable {
+        private final DeviceId deviceId;
+        private final Objective objective;
+
+        private final int numAttempts;
+
+        public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
+            this(deviceId, objective, 1);
+        }
+
+        public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
+            this.deviceId = checkNotNull(deviceId);
+            this.objective = checkNotNull(objective);
+            this.numAttempts = checkNotNull(attemps);
+        }
+
+        @Override
+        public void run() {
+            try {
+                Pipeliner pipeliner = getDevicePipeliner(deviceId);
+
+                if (pipeliner != null) {
+                    if (objective instanceof NextObjective) {
+                        pipeliner.next((NextObjective) objective);
+                    } else if (objective instanceof ForwardingObjective) {
+                        pipeliner.forward((ForwardingObjective) objective);
+                    } else {
+                        pipeliner.filter((FilteringObjective) objective);
+                    }
+                    //Attempts to check if pipeliner is null for retry attempts
+                } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
+                    Thread.sleep(INSTALL_RETRY_INTERVAL);
+                    executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
+                } else {
+                    // Otherwise we've tried a few times and failed, report an
+                    // error back to the user.
+                    objective.context().ifPresent(
+                            c -> c.onError(objective, ObjectiveError.NOPIPELINER));
+                }
+                //Excpetion thrown
+            } catch (Exception e) {
+                log.warn("Exception while installing flow objective", e);
+            }
+        }
+    }
+
+    private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
+        @Override
+        public void notify(ObjectiveEvent event) {
+            if (event.type() == ObjectiveEvent.Type.ADD) {
+                log.debug("Received notification of obj event {}", event);
+                Set<PendingNext> pending;
+                synchronized (pendingForwards) {
+                    // needs to be synchronized for queueObjective lookup
+                    pending = pendingForwards.remove(event.subject());
+                }
+
+                if (pending == null) {
+                    log.debug("Nothing pending for this obj event {}", event);
+                    return;
+                }
+
+                log.debug("Processing {} pending forwarding objectives for nextId {}",
+                          pending.size(), event.subject());
+                pending.forEach(p -> getDevicePipeliner(p.deviceId())
+                        .forward(p.forwardingObjective()));
+            }
+        }
+    }
+
+    /**
+     * Retrieves (if it exists) the device pipeline behaviour from the cache.
+     * Otherwise it warms the caches and triggers the init method of the Pipeline.
+     * For virtual network, it returns OVS pipeliner.
+     *
+     * @param deviceId the id of the device associated to the pipeline
+     * @return the implementation of the Pipeliner behaviour
+     */
+    private Pipeliner getDevicePipeliner(DeviceId deviceId) {
+        return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
+    }
+
+    /**
+     * Creates and initialize {@link Pipeliner}.
+     * <p>
+     * Note: Expected to be called under per-Device lock.
+     *      e.g., {@code pipeliners}' Map#compute family methods
+     *
+     * @param deviceId Device to initialize pipeliner
+     * @return {@link Pipeliner} instance or null
+     */
+    private Pipeliner initPipelineHandler(DeviceId deviceId) {
+        //FIXME: do we need a standard pipeline for virtual device?
+        Pipeliner pipeliner = new DefaultVirtualDevicePipeline();
+        pipeliner.init(deviceId, context);
+        return pipeliner;
+    }
+
+    // Processing context for initializing pipeline driver behaviours.
+    private class InnerPipelineContext implements PipelinerContext {
+        public ServiceDirectory directory() {
+            return serviceDirectory;
+        }
+
+        public FlowObjectiveStore store() {
+            return flowObjectiveStore;
+        }
+    }
+
+    /**
+     * Data class used to hold a pending forwarding objective that could not
+     * be processed because the associated next object was not present.
+     */
+    private class PendingNext {
+        private final DeviceId deviceId;
+        private final ForwardingObjective fwd;
+
+        public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
+            this.deviceId = deviceId;
+            this.fwd = fwd;
+        }
+
+        public DeviceId deviceId() {
+            return deviceId;
+        }
+
+        public ForwardingObjective forwardingObjective() {
+            return fwd;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(deviceId, fwd);
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof PendingNext)) {
+                return false;
+            }
+            final PendingNext other = (PendingNext) obj;
+            if (this.deviceId.equals(other.deviceId) &&
+                    this.fwd.equals(other.fwd)) {
+                return true;
+            }
+            return false;
+        }
+    }
+
+    /**
+     * This class is a wrapping class from VirtualNetworkFlowObjectiveStore
+     * to FlowObjectiveStore for PipelinerContext.
+     */
+    private class StoreConvertor implements FlowObjectiveStore {
+
+        @Override
+        public void setDelegate(FlowObjectiveStoreDelegate delegate) {
+            virtualFlowObjectiveStore.setDelegate(networkId(), delegate);
+        }
+
+        @Override
+        public void unsetDelegate(FlowObjectiveStoreDelegate delegate) {
+            virtualFlowObjectiveStore.unsetDelegate(networkId(), delegate);
+        }
+
+        @Override
+        public boolean hasDelegate() {
+            return virtualFlowObjectiveStore.hasDelegate(networkId());
+        }
+
+        @Override
+        public void putNextGroup(Integer nextId, NextGroup group) {
+            virtualFlowObjectiveStore.putNextGroup(networkId(), nextId, group);
+        }
+
+        @Override
+        public NextGroup getNextGroup(Integer nextId) {
+            return virtualFlowObjectiveStore.getNextGroup(networkId(), nextId);
+        }
+
+        @Override
+        public NextGroup removeNextGroup(Integer nextId) {
+            return virtualFlowObjectiveStore.removeNextGroup(networkId(), nextId);
+        }
+
+        @Override
+        public Map<Integer, NextGroup> getAllGroups() {
+            return virtualFlowObjectiveStore.getAllGroups(networkId());
+        }
+
+        @Override
+        public int allocateNextId() {
+            return virtualFlowObjectiveStore.allocateNextId(networkId());
+        }
+    }
+
+    /**
+     * Simple single table pipeline abstraction for virtual networks.
+     */
+    private class DefaultVirtualDevicePipeline
+            extends AbstractHandlerBehaviour implements Pipeliner {
+
+        private final Logger log = getLogger(getClass());
+
+        private DeviceId deviceId;
+
+        private Cache<Integer, NextObjective> pendingNext;
+
+        private KryoNamespace appKryo = new KryoNamespace.Builder()
+                .register(GroupKey.class)
+                .register(DefaultGroupKey.class)
+                .register(SingleGroup.class)
+                .register(byte[].class)
+                .build("DefaultVirtualDevicePipeline");
+
+        @Override
+        public void init(DeviceId deviceId, PipelinerContext context) {
+            this.deviceId = deviceId;
+
+            pendingNext = CacheBuilder.newBuilder()
+                    .expireAfterWrite(20, TimeUnit.SECONDS)
+                    .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
+                        if (notification.getCause() == RemovalCause.EXPIRED) {
+                            notification.getValue().context()
+                                    .ifPresent(c -> c.onError(notification.getValue(),
+                                                              ObjectiveError.FLOWINSTALLATIONFAILED));
+                        }
+                    }).build();
+        }
+
+        @Override
+        public void filter(FilteringObjective filter) {
+
+            TrafficTreatment.Builder actions;
+            switch (filter.type()) {
+                case PERMIT:
+                    actions = (filter.meta() == null) ?
+                            DefaultTrafficTreatment.builder().punt() :
+                            DefaultTrafficTreatment.builder(filter.meta());
+                    break;
+                case DENY:
+                    actions = (filter.meta() == null) ?
+                            DefaultTrafficTreatment.builder() :
+                            DefaultTrafficTreatment.builder(filter.meta());
+                    actions.drop();
+                    break;
+                default:
+                    log.warn("Unknown filter type: {}", filter.type());
+                    actions = DefaultTrafficTreatment.builder().drop();
+            }
+
+            TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
+
+            filter.conditions().forEach(selector::add);
+
+            if (filter.key() != null) {
+                selector.add(filter.key());
+            }
+
+            FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                    .forDevice(deviceId)
+                    .withSelector(selector.build())
+                    .withTreatment(actions.build())
+                    .fromApp(filter.appId())
+                    .withPriority(filter.priority());
+
+            if (filter.permanent()) {
+                ruleBuilder.makePermanent();
+            } else {
+                ruleBuilder.makeTemporary(filter.timeout());
+            }
+
+            installObjective(ruleBuilder, filter);
+        }
+
+        @Override
+        public void forward(ForwardingObjective fwd) {
+            TrafficSelector selector = fwd.selector();
+
+            if (fwd.treatment() != null) {
+                // Deal with SPECIFIC and VERSATILE in the same manner.
+                FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                        .forDevice(deviceId)
+                        .withSelector(selector)
+                        .fromApp(fwd.appId())
+                        .withPriority(fwd.priority())
+                        .withTreatment(fwd.treatment());
+
+                if (fwd.permanent()) {
+                    ruleBuilder.makePermanent();
+                } else {
+                    ruleBuilder.makeTemporary(fwd.timeout());
+                }
+                installObjective(ruleBuilder, fwd);
+
+            } else {
+                NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
+                if (nextObjective != null) {
+                    pendingNext.invalidate(fwd.nextId());
+                    nextObjective.next().forEach(treat -> {
+                        FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                                .forDevice(deviceId)
+                                .withSelector(selector)
+                                .fromApp(fwd.appId())
+                                .withPriority(fwd.priority())
+                                .withTreatment(treat);
+
+                        if (fwd.permanent()) {
+                            ruleBuilder.makePermanent();
+                        } else {
+                            ruleBuilder.makeTemporary(fwd.timeout());
+                        }
+                        installObjective(ruleBuilder, fwd);
+                    });
+                } else {
+                    fwd.context().ifPresent(c -> c.onError(fwd,
+                                                           ObjectiveError.GROUPMISSING));
+                }
+            }
+        }
+
+        private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
+            FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
+            switch (objective.op()) {
+
+                case ADD:
+                    flowBuilder.add(ruleBuilder.build());
+                    break;
+                case REMOVE:
+                    flowBuilder.remove(ruleBuilder.build());
+                    break;
+                default:
+                    log.warn("Unknown operation {}", objective.op());
+            }
+
+            flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
+                @Override
+                public void onSuccess(FlowRuleOperations ops) {
+                    objective.context().ifPresent(context -> context.onSuccess(objective));
+                }
+
+                @Override
+                public void onError(FlowRuleOperations ops) {
+                    objective.context()
+                            .ifPresent(context ->
+                                               context.onError(objective,
+                                                                  ObjectiveError.FLOWINSTALLATIONFAILED));
+                }
+            }));
+        }
+
+        @Override
+        public void next(NextObjective nextObjective) {
+
+            pendingNext.put(nextObjective.id(), nextObjective);
+            flowObjectiveStore.putNextGroup(nextObjective.id(),
+                                            new SingleGroup(
+                                                    new DefaultGroupKey(
+                                                            appKryo.serialize(nextObjective.id()))));
+            nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
+        }
+
+        @Override
+        public List<String> getNextMappings(NextGroup nextGroup) {
+            // Default single table pipeline does not use nextObjectives or groups
+            return null;
+        }
+
+        private class SingleGroup implements NextGroup {
+
+            private final GroupKey key;
+
+            public SingleGroup(GroupKey key) {
+                this.key = key;
+            }
+
+            public GroupKey key() {
+                return key;
+            }
+
+            @Override
+            public byte[] data() {
+                return appKryo.serialize(key);
+            }
+        }
+    }
+}
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
index d50cc47..b1316a1 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
@@ -58,6 +58,7 @@
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.group.GroupService;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.intent.IntentEvent;
 import org.onosproject.net.intent.IntentListener;
@@ -425,6 +426,8 @@
             service = new VirtualNetworkPacketManager(this, network.id());
         } else if (serviceKey.serviceClass.equals(GroupService.class)) {
             service = new VirtualNetworkGroupManager(this, network.id());
+        } else if (serviceKey.serviceClass.equals(FlowObjectiveService.class)) {
+            service = new VirtualNetworkFlowObjectiveManager(this, network.id());
         } else {
             return null;
         }
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManagerTest.java
new file mode 100644
index 0000000..2ca243b
--- /dev/null
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowObjectiveManagerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual.impl;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestTools;
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.osgi.TestServiceDirectory;
+import org.onosproject.TestApplicationId;
+import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetwork;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkStore;
+import org.onosproject.incubator.net.virtual.event.VirtualEvent;
+import org.onosproject.incubator.net.virtual.event.VirtualListenerRegistryManager;
+import org.onosproject.incubator.net.virtual.impl.provider.VirtualProviderManager;
+import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
+import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowObjectiveStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowRuleStore;
+import org.onosproject.net.NetTestTools;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.intent.FakeIntentManager;
+import org.onosproject.net.intent.TestableIntentService;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.TestStorageService;
+
+import static org.junit.Assert.*;
+
+/**
+ * Junit tests for VirtualNetworkFlowObjectiveManager.
+ */
+public class VirtualNetworkFlowObjectiveManagerTest
+        extends VirtualNetworkTestUtil {
+
+    private static final int RETRY_MS = 250;
+
+    private VirtualNetworkManager manager;
+    private DistributedVirtualNetworkStore virtualNetworkManagerStore;
+    private TestableIntentService intentService = new FakeIntentManager();
+    private ServiceDirectory testDirectory;
+    private SimpleVirtualFlowObjectiveStore flowObjectiveStore;
+
+    private VirtualProviderManager providerRegistryService;
+    private EventDeliveryService eventDeliveryService;
+    VirtualListenerRegistryManager listenerRegistryManager =
+            VirtualListenerRegistryManager.getInstance();
+
+    private ApplicationId appId;
+
+    private VirtualNetwork vnet1;
+    private VirtualNetwork vnet2;
+
+    private FlowObjectiveService service1;
+    private FlowObjectiveService service2;
+
+    //FIXME: referring flowrule service, store, and provider shouldn't be here
+    private VirtualFlowRuleProvider flowRuleProvider = new TestProvider();
+    private SimpleVirtualFlowRuleStore flowRuleStore;
+
+    @Before
+    public void setUp() throws Exception {
+        virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
+
+        CoreService coreService = new TestCoreService();
+        TestUtils.setField(virtualNetworkManagerStore, "coreService", coreService);
+        StorageService storageService = new TestStorageService();
+        TestUtils.setField(virtualNetworkManagerStore, "storageService", storageService);
+        virtualNetworkManagerStore.activate();
+
+        flowObjectiveStore = new SimpleVirtualFlowObjectiveStore();
+        TestUtils.setField(flowObjectiveStore, "storageService", storageService);
+        flowObjectiveStore.activate();
+        flowRuleStore = new SimpleVirtualFlowRuleStore();
+        flowRuleStore.activate();
+
+        manager = new VirtualNetworkManager();
+        manager.store = virtualNetworkManagerStore;
+        manager.intentService = intentService;
+        TestUtils.setField(manager, "coreService", coreService);
+
+        providerRegistryService = new VirtualProviderManager();
+        providerRegistryService.registerProvider(flowRuleProvider);
+
+        eventDeliveryService = new TestEventDispatcher();
+        NetTestTools.injectEventDispatcher(manager, eventDeliveryService);
+        eventDeliveryService.addSink(VirtualEvent.class, listenerRegistryManager);
+
+        appId = new TestApplicationId("FlowRuleManagerTest");
+
+        testDirectory = new TestServiceDirectory()
+                .add(VirtualNetworkStore.class, virtualNetworkManagerStore)
+                .add(CoreService.class, coreService)
+                .add(EventDeliveryService.class, eventDeliveryService)
+                .add(VirtualProviderRegistryService.class, providerRegistryService)
+                .add(VirtualNetworkFlowRuleStore.class, flowRuleStore)
+                .add(VirtualNetworkFlowObjectiveStore.class, flowObjectiveStore);
+        TestUtils.setField(manager, "serviceDirectory", testDirectory);
+
+        manager.activate();
+
+        vnet1 = setupVirtualNetworkTopology(manager, TID1);
+        vnet2 = setupVirtualNetworkTopology(manager, TID2);
+
+        service1 = new VirtualNetworkFlowObjectiveManager(manager, vnet1.id());
+        service2 = new VirtualNetworkFlowObjectiveManager(manager, vnet2.id());
+    }
+
+    @After
+    public void tearDownTest() {
+        manager.deactivate();
+        virtualNetworkManagerStore.deactivate();
+    }
+
+    /**
+     * Tests adding a forwarding objective.
+     */
+    @Test
+    public void forwardingObjective() {
+        TrafficSelector selector = DefaultTrafficSelector.emptySelector();
+        TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
+        ForwardingObjective forward =
+                DefaultForwardingObjective.builder()
+                        .fromApp(NetTestTools.APP_ID)
+                        .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                        .withSelector(selector)
+                        .withTreatment(treatment)
+                        .makePermanent()
+                        .add();
+
+        service1.forward(VDID1, forward);
+
+        TestTools.assertAfter(RETRY_MS, () ->
+                assertEquals("1 flowrule entry expected",
+                             1, flowRuleStore.getFlowRuleCount(vnet1.id())));
+        TestTools.assertAfter(RETRY_MS, () ->
+                assertEquals("0 flowrule entry expected",
+                             0, flowRuleStore.getFlowRuleCount(vnet2.id())));
+    }
+
+    //TODO: More test cases for filter, foward, and next
+
+    private class TestProvider extends AbstractVirtualProvider
+            implements VirtualFlowRuleProvider {
+
+        protected TestProvider() {
+            super(new ProviderId("test", "org.onosproject.virtual.testprovider"));
+        }
+
+        @Override
+        public void applyFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+        }
+
+        @Override
+        public void removeFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+        }
+
+        @Override
+        public void executeBatch(NetworkId networkId, FlowRuleBatchOperation batch) {
+
+        }
+    }
+}
\ No newline at end of file
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
index 3cf0457..2d125d1 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
@@ -40,6 +40,7 @@
 import org.onosproject.incubator.net.virtual.VirtualLink;
 import org.onosproject.incubator.net.virtual.VirtualNetwork;
 import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
 import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
 import org.onosproject.incubator.net.virtual.VirtualNetworkGroupStore;
 import org.onosproject.incubator.net.virtual.VirtualNetworkIntent;
@@ -54,6 +55,7 @@
 import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderService;
 import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
 import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowObjectiveStore;
 import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowRuleStore;
 import org.onosproject.incubator.store.virtual.impl.SimpleVirtualGroupStore;
 import org.onosproject.incubator.store.virtual.impl.SimpleVirtualPacketStore;
@@ -833,7 +835,8 @@
                 .add(ClusterService.class, new ClusterServiceAdapter())
                 .add(VirtualNetworkFlowRuleStore.class, new SimpleVirtualFlowRuleStore())
                 .add(VirtualNetworkPacketStore.class, new SimpleVirtualPacketStore())
-                .add(VirtualNetworkGroupStore.class, new SimpleVirtualGroupStore());
+                .add(VirtualNetworkGroupStore.class, new SimpleVirtualGroupStore())
+                .add(VirtualNetworkFlowObjectiveStore.class, new SimpleVirtualFlowObjectiveStore());
 
         validateServiceGetReturnsSavedInstance(virtualNetwork.id(), FlowRuleService.class);
         validateServiceGetReturnsSavedInstance(virtualNetwork.id(), PacketService.class);
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
index 1a49e22..1cedd4a 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkPacketManagerTest.java
@@ -33,13 +33,18 @@
 import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.incubator.net.virtual.VirtualDevice;
 import org.onosproject.incubator.net.virtual.VirtualNetwork;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
 import org.onosproject.incubator.net.virtual.VirtualNetworkPacketStore;
 import org.onosproject.incubator.net.virtual.VirtualNetworkStore;
 import org.onosproject.incubator.net.virtual.impl.provider.VirtualProviderManager;
 import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
 import org.onosproject.incubator.net.virtual.provider.VirtualPacketProvider;
 import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
 import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowObjectiveStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualFlowRuleStore;
 import org.onosproject.incubator.store.virtual.impl.SimpleVirtualPacketStore;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.NetTestTools;
@@ -49,6 +54,8 @@
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
 import org.onosproject.net.intent.FakeIntentManager;
 import org.onosproject.net.intent.TestableIntentService;
 import org.onosproject.net.packet.DefaultOutboundPacket;
@@ -57,6 +64,7 @@
 import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.TestStorageService;
 
 import java.nio.ByteBuffer;
@@ -95,12 +103,17 @@
 
     private ApplicationId appId = new TestApplicationId("VirtualPacketManagerTest");
 
+    private VirtualFlowRuleProvider flowRuleProvider = new TestFlowRuleProvider();
+    private SimpleVirtualFlowRuleStore flowRuleStore;
+    private SimpleVirtualFlowObjectiveStore flowObjectiveStore;
+
     @Before
     public void setUp() throws TestUtils.TestUtilsException {
         virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
 
         TestUtils.setField(virtualNetworkManagerStore, "coreService", coreService);
-        TestUtils.setField(virtualNetworkManagerStore, "storageService", new TestStorageService());
+        StorageService storageService = new TestStorageService();
+        TestUtils.setField(virtualNetworkManagerStore, "storageService", storageService);
         virtualNetworkManagerStore.activate();
 
         manager = new VirtualNetworkManager();
@@ -109,8 +122,15 @@
         manager.intentService = intentService;
         NetTestTools.injectEventDispatcher(manager, new TestEventDispatcher());
 
+        flowObjectiveStore = new SimpleVirtualFlowObjectiveStore();
+        TestUtils.setField(flowObjectiveStore, "storageService", storageService);
+        flowObjectiveStore.activate();
+        flowRuleStore = new SimpleVirtualFlowRuleStore();
+        flowRuleStore.activate();
+
         providerRegistryService = new VirtualProviderManager();
         providerRegistryService.registerProvider(provider);
+        providerRegistryService.registerProvider(flowRuleProvider);
 
         testDirectory = new TestServiceDirectory()
                 .add(VirtualNetworkStore.class, virtualNetworkManagerStore)
@@ -118,6 +138,8 @@
                 .add(VirtualProviderRegistryService.class, providerRegistryService)
                 .add(EventDeliveryService.class, eventDeliveryService)
                 .add(ClusterService.class, new ClusterServiceAdapter())
+                .add(VirtualNetworkFlowRuleStore.class, flowRuleStore)
+                .add(VirtualNetworkFlowObjectiveStore.class, flowObjectiveStore)
                 .add(VirtualNetworkPacketStore.class, packetStore);
         TestUtils.setField(manager, "serviceDirectory", testDirectory);
 
@@ -373,4 +395,27 @@
             return false;
         }
     }
+
+    private class TestFlowRuleProvider extends AbstractVirtualProvider
+            implements VirtualFlowRuleProvider {
+
+        protected TestFlowRuleProvider() {
+            super(new ProviderId("test", "org.onosproject.virtual.testprovider"));
+        }
+
+        @Override
+        public void applyFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+        }
+
+        @Override
+        public void removeFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+        }
+
+        @Override
+        public void executeBatch(NetworkId networkId, FlowRuleBatchOperation batch) {
+
+        }
+    }
 }
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowObjectiveStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowObjectiveStore.java
new file mode 100644
index 0000000..1ac5396
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowObjectiveStore.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.virtual.impl;
+
+import com.google.common.collect.Maps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowObjectiveStore;
+import org.onosproject.net.behaviour.DefaultNextGroup;
+import org.onosproject.net.behaviour.NextGroup;
+import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
+import org.onosproject.net.flowobjective.ObjectiveEvent;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Single instance implementation of store to manage
+ * the inventory of created next groups for virtual network.
+ */
+@Component(immediate = true)
+@Service
+public class SimpleVirtualFlowObjectiveStore
+        extends AbstractVirtualStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
+        implements VirtualNetworkFlowObjectiveStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private ConcurrentMap<NetworkId, ConcurrentMap<Integer, byte[]>> nextGroupsMap;
+
+    private AtomicCounter nextIds;
+
+    // event queue to separate map-listener threads from event-handler threads (tpool)
+    private BlockingQueue<VirtualObjectiveEvent> eventQ;
+    private ExecutorService tpool;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Activate
+    public void activate() {
+        tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/virtual/flobj-notifier", "%d", log));
+        eventQ = new LinkedBlockingQueue<>();
+        tpool.execute(new FlowObjectiveNotifier());
+
+        nextGroupsMap = Maps.newConcurrentMap();
+
+        nextIds = storageService.getAtomicCounter("next-objective-counter");
+        log.info("Started");
+    }
+
+    private ConcurrentMap<Integer, byte[]> getNextGroups(NetworkId networkId) {
+        nextGroupsMap.computeIfAbsent(networkId, n -> Maps.newConcurrentMap());
+        return nextGroupsMap.get(networkId);
+    }
+
+    @Override
+    public void putNextGroup(NetworkId networkId, Integer nextId, NextGroup group) {
+        ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+        nextGroups.put(nextId, group.data());
+
+        eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.ADD, nextId));
+    }
+
+    @Override
+    public NextGroup getNextGroup(NetworkId networkId, Integer nextId) {
+        ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+        return new DefaultNextGroup(nextGroups.get(nextId));
+    }
+
+    @Override
+    public NextGroup removeNextGroup(NetworkId networkId, Integer nextId) {
+        ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+        eventQ.add(new VirtualObjectiveEvent(networkId, ObjectiveEvent.Type.REMOVE, nextId));
+        return new DefaultNextGroup(nextGroups.remove(nextId));
+    }
+
+    @Override
+    public Map<Integer, NextGroup> getAllGroups(NetworkId networkId) {
+        ConcurrentMap<Integer, byte[]> nextGroups = getNextGroups(networkId);
+
+        Map<Integer, NextGroup> nextGroupMappings = new HashMap<>();
+        for (int key : nextGroups.keySet()) {
+            NextGroup nextGroup = getNextGroup(networkId, key);
+            if (nextGroup != null) {
+                nextGroupMappings.put(key, nextGroup);
+            }
+        }
+        return nextGroupMappings;
+    }
+
+    @Override
+    public int allocateNextId(NetworkId networkId) {
+        return (int) nextIds.incrementAndGet();
+    }
+
+    private class FlowObjectiveNotifier implements Runnable {
+        @Override
+        public void run() {
+            try {
+                while (!Thread.currentThread().isInterrupted()) {
+                    VirtualObjectiveEvent vEvent = eventQ.take();
+                    notifyDelegate(vEvent.networkId(), vEvent);
+                }
+            } catch (InterruptedException ex) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private class VirtualObjectiveEvent extends ObjectiveEvent {
+        NetworkId networkId;
+
+        public VirtualObjectiveEvent(NetworkId networkId, Type type,
+                                     Integer objective) {
+            super(type, objective);
+            this.networkId = networkId;
+        }
+
+        NetworkId networkId() {
+            return networkId;
+        }
+    }
+}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java
index 5f71b57..0952574 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualFlowRuleStore.java
@@ -151,8 +151,12 @@
 
     @Override
     public int getFlowRuleCount(NetworkId networkId) {
-
         int sum = 0;
+
+        if (flowEntries.get(networkId) == null) {
+            return 0;
+        }
+
         for (ConcurrentMap<FlowId, List<StoredFlowEntry>> ft :
                 flowEntries.get(networkId).values()) {
             for (List<StoredFlowEntry> fes : ft.values()) {