[ONOS-5708] Implement VirtualNetworkFlowRuleManager

Changes
1. VirtualFlowRuleStore is added
2. VirtualFlowRuleManager test code is added
3. VirtualProviders can be registered through VirtualNetworkManager
4. VirtualFlowRuleManager is implemented
5. VnetServiceWithProvider service is added for interconnectiong
    VirtualProvider and VirtualProviderService

TODO
1. Implement more test cases

Change-Id: I0a1ce633b8ed78f0529e40ed34ae702974f53f69
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkAdminService.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkAdminService.java
index 38e745f..a7fbe13 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkAdminService.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkAdminService.java
@@ -176,5 +176,4 @@
      * @throws org.onlab.util.ItemNotFoundException if no such network or port found
      */
     void removeVirtualPort(NetworkId networkId, DeviceId deviceId, PortNumber portNumber);
-
 }
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkFlowRuleStore.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkFlowRuleStore.java
new file mode 100644
index 0000000..faca465
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkFlowRuleStore.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchEvent;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleStoreDelegate;
+import org.onosproject.net.flow.TableStatisticsEntry;
+import org.onosproject.store.Store;
+
+import java.util.List;
+
+public interface VirtualNetworkFlowRuleStore
+        extends Store<FlowRuleBatchEvent, FlowRuleStoreDelegate> {
+    /**
+     * Returns the number of flow rule in the store.
+     *
+     * @param networkId virtual network identifier
+     * @return number of flow rules
+     */
+    int getFlowRuleCount(NetworkId networkId);
+
+    /**
+     * Returns the stored flow.
+     *
+     * @param networkId virtual network identifier
+     * @param rule the rule to look for
+     * @return a flow rule
+     */
+    FlowEntry getFlowEntry(NetworkId networkId, FlowRule rule);
+
+    /**
+     * Returns the flow entries associated with a device.
+     *
+     * @param networkId virtual network identifier
+     * @param deviceId the device ID
+     * @return the flow entries
+     */
+    Iterable<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId);
+
+    /**
+     * // TODO: Better description of method behavior.
+     * Stores a new flow rule without generating events.
+     *
+     * @param networkId virtual network identifier
+     * @param rule the flow rule to add
+     * @deprecated in Cardinal Release
+     */
+    @Deprecated
+    void storeFlowRule(NetworkId networkId, FlowRule rule);
+
+    /**
+     * Stores a batch of flow rules.
+     *
+     * @param networkId virtual network identifier
+     * @param batchOperation batch of flow rules.
+     *           A batch can contain flow rules for a single device only.
+     *
+     */
+    void storeBatch(NetworkId networkId, FlowRuleBatchOperation batchOperation);
+
+    /**
+     * Invoked on the completion of a storeBatch operation.
+     *
+     * @param networkId virtual network identifier
+     * @param event flow rule batch event
+     */
+    void batchOperationComplete(NetworkId networkId, FlowRuleBatchEvent event);
+
+    /**
+     * Marks a flow rule for deletion. Actual deletion will occur
+     * when the provider indicates that the flow has been removed.
+     *
+     * @param networkId virtual network identifier
+     * @param rule the flow rule to delete
+     */
+    void deleteFlowRule(NetworkId networkId, FlowRule rule);
+
+    /**
+     * Stores a new flow rule, or updates an existing entry.
+     *
+     * @param networkId virtual network identifier
+     * @param rule the flow rule to add or update
+     * @return flow_added event, or null if just an update
+     */
+    FlowRuleEvent addOrUpdateFlowRule(NetworkId networkId, FlowEntry rule);
+
+    /**
+     * Removes an existing flow entry.
+     *
+     * @param rule the flow entry to remove
+     * @param networkId virtual network identifier
+     * @return flow_removed event, or null if nothing removed
+     */
+    FlowRuleEvent removeFlowRule(NetworkId networkId, FlowEntry rule);
+
+    /**
+     * Marks a flow rule as PENDING_ADD during retry.
+     *
+     * Emits flow_update event if the state is changed
+     *
+     * @param networkId virtual network identifier
+     * @param rule the flow rule that is retrying
+     * @return flow_updated event, or null if nothing updated
+     */
+    FlowRuleEvent pendingFlowRule(NetworkId networkId, FlowEntry rule);
+
+    /**
+     * Removes all flow entries of given device from store.
+     *
+     * @param networkId virtual network identifier
+     * @param deviceId device id
+     */
+    default void purgeFlowRule(NetworkId networkId, DeviceId deviceId) {}
+
+    /**
+     * Removes all flow entries from store.
+     *
+     * @param networkId virtual network identifier
+     */
+    void purgeFlowRules(NetworkId networkId);
+
+    /**
+     * Updates the flow table statistics of the specified device using
+     * the given statistics.
+     *
+     * @param networkId virtual network identifier
+     * @param deviceId    device identifier
+     * @param tableStats   list of table statistics
+     * @return ready to send event describing what occurred;
+     */
+    FlowRuleEvent updateTableStatistics(NetworkId networkId, DeviceId deviceId,
+                                        List<TableStatisticsEntry> tableStats);
+
+    /**
+     * Returns the flow table statistics associated with a device.
+     *
+     * @param networkId virtual network identifier
+     * @param deviceId the device ID
+     * @return the flow table statistics
+     */
+    Iterable<TableStatisticsEntry> getTableStatistics(NetworkId networkId, DeviceId deviceId);
+}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/AbstractVirtualProviderService.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/AbstractVirtualProviderService.java
index e12f1ef..8603be5 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/AbstractVirtualProviderService.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/AbstractVirtualProviderService.java
@@ -25,7 +25,7 @@
  * @param <P> type of the information provider
  */
 public abstract class AbstractVirtualProviderService<P extends VirtualProvider>
-        implements VirtualProviderService {
+        implements VirtualProviderService<P> {
 
     private boolean isValid = true;
     private P provider = null;
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualFlowRuleProvider.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualFlowRuleProvider.java
index 89f1f43..87fe7a5 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualFlowRuleProvider.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualFlowRuleProvider.java
@@ -17,6 +17,7 @@
 
 import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
 
 /**
  * Abstraction of a virtual flow rule provider.
@@ -42,4 +43,13 @@
      * @param flowRules one or more flow rules
      */
     void removeFlowRule(NetworkId networkId, FlowRule... flowRules);
+
+    /**
+     * Installs a batch of flow rules. Each flowrule is associated to an
+     * operation which results in either addition, removal or modification.
+     *
+     * @param networkId the identity of the virtual network where this rule applies
+     * @param batch a batch of flow rules
+     */
+    void executeBatch(NetworkId networkId, FlowRuleBatchOperation batch);
 }
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualFlowRuleProviderService.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualFlowRuleProviderService.java
index 3f6bf8c..9b4fb50 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualFlowRuleProviderService.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualFlowRuleProviderService.java
@@ -15,8 +15,8 @@
  */
 package org.onosproject.incubator.net.virtual.provider;
 
-import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.TableStatisticsEntry;
 
@@ -32,42 +32,44 @@
     /**
      * Signals that a flow rule that was previously installed has been removed.
      *
-     * @param networkId the identity of the virtual network where this rule applies
      * @param flowEntry removed flow entry
      */
-    void flowRemoved(NetworkId networkId, FlowEntry flowEntry);
+    void flowRemoved(FlowEntry flowEntry);
 
     /**
      * Pushes the collection of flow entries currently applied on the given
      * virtual device.
      *
-     * @param networkId the identity of the virtual network where this rule applies
      * @param deviceId device identifier
      * @param flowEntries collection of flow rules
      */
-    void pushFlowMetrics(NetworkId networkId, DeviceId deviceId,
-                         Iterable<FlowEntry> flowEntries);
+    void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
 
     /**
      * Pushes the collection of flow entries currently applied on the given
      * device without flowMissing process.
      *
-     * @param networkId the identity of the virtual network where this rule applies
      * @param deviceId device identifier
      * @param flowEntries collection of flow rules
      */
-    void pushFlowMetricsWithoutFlowMissing(NetworkId networkId, DeviceId deviceId,
-                                           Iterable<FlowEntry> flowEntries);
+    void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
 
     /**
      * Pushes the collection of table statistics entries currently extracted
      * from the given virtual device.
      *
-     * @param networkId the identity of the virtual network where this rule applies
      * @param deviceId device identifier
      * @param tableStatsEntries collection of flow table statistics entries
      */
-    void pushTableStatistics(NetworkId networkId, DeviceId deviceId,
-                             List<TableStatisticsEntry> tableStatsEntries);
+    void pushTableStatistics(DeviceId deviceId, List<TableStatisticsEntry> tableStatsEntries);
+
+    /**
+     * Indicates to the core that the requested batch operation has
+     * been completed.
+     *
+     * @param batchId the batch which was processed
+     * @param operation the resulting outcome of the operation
+     */
+    void batchOperationCompleted(long batchId, CompletedBatchOperation operation);
 
 }
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkProvider.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualNetworkProvider.java
similarity index 92%
rename from incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkProvider.java
rename to incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualNetworkProvider.java
index 104c9d3..9f05f15 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkProvider.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualNetworkProvider.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015-present Open Networking Laboratory
+ * Copyright 2016-present Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,9 +13,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.incubator.net.virtual;
+package org.onosproject.incubator.net.virtual.provider;
 
 import org.onosproject.incubator.net.tunnel.TunnelId;
+import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.provider.Provider;
 
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkProviderRegistry.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualNetworkProviderRegistry.java
similarity index 88%
rename from incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkProviderRegistry.java
rename to incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualNetworkProviderRegistry.java
index 9febbe4..465301f 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkProviderRegistry.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualNetworkProviderRegistry.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015-present Open Networking Laboratory
+ * Copyright 2016-present Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.incubator.net.virtual;
+package org.onosproject.incubator.net.virtual.provider;
 
 import org.onosproject.net.provider.ProviderRegistry;
 
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkProviderService.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualNetworkProviderService.java
similarity index 92%
rename from incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkProviderService.java
rename to incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualNetworkProviderService.java
index 50f0fc3..330e5e8 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkProviderService.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualNetworkProviderService.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015-present Open Networking Laboratory
+ * Copyright 2016-present Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,9 +13,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.incubator.net.virtual;
+package org.onosproject.incubator.net.virtual.provider;
 
 import org.onosproject.incubator.net.tunnel.TunnelId;
+import org.onosproject.incubator.net.virtual.NetworkId;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.provider.ProviderService;
 
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowRuleManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowRuleManager.java
index 1bf8a4a..b7dd1cb 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowRuleManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowRuleManager.java
@@ -16,101 +16,499 @@
 
 package org.onosproject.incubator.net.virtual.impl;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.onlab.osgi.ServiceDirectory;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.IdGenerator;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.incubator.net.virtual.VirtualNetwork;
-import org.onosproject.incubator.net.virtual.VirtualNetworkService;
+import org.onosproject.incubator.net.virtual.VirtualNetworkAdminService;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
 import org.onosproject.incubator.net.virtual.VnetService;
+import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProviderService;
+import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProviderService;
+import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
+import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.CompletedBatchOperation;
+import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchEvent;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleBatchRequest;
 import org.onosproject.net.flow.FlowRuleEvent;
 import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleOperation;
 import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TableStatisticsEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
 
 /**
  * Flow rule service implementation built on the virtual network service.
  */
-public class VirtualNetworkFlowRuleManager extends AbstractListenerManager<FlowRuleEvent, FlowRuleListener>
+public class VirtualNetworkFlowRuleManager
+        extends AbstractListenerManager<FlowRuleEvent, FlowRuleListener>
         implements FlowRuleService, VnetService {
 
     private static final String NETWORK_NULL = "Network ID cannot be null";
+    private static final String VIRTUAL_FLOW_OP_TOPIC = "virtual-flow-ops-ids";
+    private static final String THREAD_GROUP_NAME = "onos/virtual-flowservice";
+    private static final String DEVICE_INSTALLER_PATTERN = "device-installer-%d";
+    private static final String OPERATION_PATTERN = "operations-%d";
+    public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
     private final VirtualNetwork network;
-    private final VirtualNetworkService manager;
+    private final VirtualNetworkAdminService manager;
+    private final VirtualNetworkFlowRuleStore store;
+    private final DeviceService deviceService;
+
+    protected ExecutorService deviceInstallers =
+            Executors.newFixedThreadPool(32,
+                                         groupedThreads(THREAD_GROUP_NAME,
+                                                        DEVICE_INSTALLER_PATTERN, log));
+    protected ExecutorService operationsService =
+            Executors.newFixedThreadPool(32,
+                                         groupedThreads(THREAD_GROUP_NAME,
+                                                        OPERATION_PATTERN, log));
+    private IdGenerator idGenerator;
+
+    private final Map<Long, FlowOperationsProcessor> pendingFlowOperations = new ConcurrentHashMap<>();
+
+    private VirtualProviderRegistryService providerRegistryService = null;
+    private InternalFlowRuleProviderService innerProviderService = null;
+
+
 
     /**
      * Creates a new VirtualNetworkFlowRuleService object.
      *
      * @param virtualNetworkManager virtual network manager service
      * @param network               virtual network
+     * @param serviceDirectory      service directory
      */
-    public VirtualNetworkFlowRuleManager(VirtualNetworkService virtualNetworkManager, VirtualNetwork network) {
+    public VirtualNetworkFlowRuleManager(VirtualNetworkAdminService virtualNetworkManager,
+                                         VirtualNetwork network,
+                                         ServiceDirectory serviceDirectory) {
         checkNotNull(network, NETWORK_NULL);
         this.network = network;
-        this.manager = virtualNetworkManager;
+
+        manager = virtualNetworkManager;
+        store = serviceDirectory.get(VirtualNetworkFlowRuleStore.class);
+        idGenerator = serviceDirectory.get(CoreService.class)
+                .getIdGenerator(VIRTUAL_FLOW_OP_TOPIC + network.id().toString());
+
+        providerRegistryService =
+                serviceDirectory.get(VirtualProviderRegistryService.class);
+        innerProviderService = new InternalFlowRuleProviderService();
+        providerRegistryService.registerProviderService(network.id(), innerProviderService);
+
+        this.deviceService = manager.get(network.id(), DeviceService.class);
     }
 
     @Override
     public int getFlowRuleCount() {
-        return 0;
+        return store.getFlowRuleCount(network.id());
     }
 
     @Override
     public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
-        return null;
+        return store.getFlowEntries(network.id(), deviceId);
     }
 
     @Override
     public void applyFlowRules(FlowRule... flowRules) {
-
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+        for (FlowRule flowRule : flowRules) {
+            builder.add(flowRule);
+        }
+        apply(builder.build());
     }
 
     @Override
     public void purgeFlowRules(DeviceId deviceId) {
-
+        store.purgeFlowRule(network.id(), deviceId);
     }
 
     @Override
     public void removeFlowRules(FlowRule... flowRules) {
-
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+        for (FlowRule flowRule : flowRules) {
+            builder.remove(flowRule);
+        }
+        apply(builder.build());
     }
 
     @Override
-    public void removeFlowRulesById(ApplicationId appId) {
-
+    public void removeFlowRulesById(ApplicationId id) {
+        removeFlowRules(Iterables.toArray(getFlowRulesById(id), FlowRule.class));
     }
 
     @Override
     public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
-        return null;
+        DeviceService deviceService = manager.get(network.id(), DeviceService.class);
+
+        Set<FlowRule> flowEntries = Sets.newHashSet();
+        for (Device d : deviceService.getDevices()) {
+            for (FlowEntry flowEntry : store.getFlowEntries(network.id(), d.id())) {
+                if (flowEntry.appId() == id.id()) {
+                    flowEntries.add(flowEntry);
+                }
+            }
+        }
+        return flowEntries;
     }
 
     @Override
     public Iterable<FlowEntry> getFlowEntriesById(ApplicationId id) {
-        return null;
+        DeviceService deviceService = manager.get(network.id(), DeviceService.class);
+
+        Set<FlowEntry> flowEntries = Sets.newHashSet();
+        for (Device d : deviceService.getDevices()) {
+            for (FlowEntry flowEntry : store.getFlowEntries(network.id(), d.id())) {
+                if (flowEntry.appId() == id.id()) {
+                    flowEntries.add(flowEntry);
+                }
+            }
+        }
+        return flowEntries;
     }
 
     @Override
     public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
-        return null;
+        DeviceService deviceService = manager.get(network.id(), DeviceService.class);
+
+        Set<FlowRule> matches = Sets.newHashSet();
+        long toLookUp = ((long) appId.id() << 16) | groupId;
+        for (Device d : deviceService.getDevices()) {
+            for (FlowEntry flowEntry : store.getFlowEntries(network.id(), d.id())) {
+                if ((flowEntry.id().value() >>> 32) == toLookUp) {
+                    matches.add(flowEntry);
+                }
+            }
+        }
+        return matches;
     }
 
     @Override
     public void apply(FlowRuleOperations ops) {
-
+        operationsService.execute(new FlowOperationsProcessor(ops));
     }
 
     @Override
     public Iterable<TableStatisticsEntry> getFlowTableStatistics(DeviceId deviceId) {
-        return null;
+        return store.getTableStatistics(network.id(), deviceId);
+    }
+
+    private static FlowRuleBatchEntry.FlowRuleOperation mapOperationType(FlowRuleOperation.Type input) {
+        switch (input) {
+            case ADD:
+                return FlowRuleBatchEntry.FlowRuleOperation.ADD;
+            case MODIFY:
+                return FlowRuleBatchEntry.FlowRuleOperation.MODIFY;
+            case REMOVE:
+                return FlowRuleBatchEntry.FlowRuleOperation.REMOVE;
+            default:
+                throw new UnsupportedOperationException("Unknown flow rule type " + input);
+        }
     }
 
     @Override
     public VirtualNetwork network() {
-        return network;
+        return this.network;
+    }
+
+    private class FlowOperationsProcessor implements Runnable {
+        // Immutable
+        private final FlowRuleOperations fops;
+
+        // Mutable
+        private final List<Set<FlowRuleOperation>> stages;
+        private final Set<DeviceId> pendingDevices = new HashSet<>();
+        private boolean hasFailed = false;
+
+        FlowOperationsProcessor(FlowRuleOperations ops) {
+            this.stages = Lists.newArrayList(ops.stages());
+            this.fops = ops;
+        }
+
+        @Override
+        public synchronized void run() {
+            if (stages.size() > 0) {
+                process(stages.remove(0));
+            } else if (!hasFailed) {
+                fops.callback().onSuccess(fops);
+            }
+        }
+
+        private void process(Set<FlowRuleOperation> ops) {
+            Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap.create();
+
+            for (FlowRuleOperation op : ops) {
+                perDeviceBatches.put(op.rule().deviceId(),
+                                     new FlowRuleBatchEntry(mapOperationType(op.type()), op.rule()));
+            }
+            pendingDevices.addAll(perDeviceBatches.keySet());
+
+            for (DeviceId deviceId : perDeviceBatches.keySet()) {
+                long id = idGenerator.getNewId();
+                final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
+                                                                            deviceId, id);
+                pendingFlowOperations.put(id, this);
+                deviceInstallers.execute(() -> store.storeBatch(network.id(), b));
+            }
+        }
+
+        synchronized void satisfy(DeviceId devId) {
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                operationsService.execute(this);
+            }
+        }
+
+        synchronized void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+            hasFailed = true;
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                operationsService.execute(this);
+            }
+
+            FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
+            failures.forEach(failedOpsBuilder::add);
+
+            fops.callback().onError(failedOpsBuilder.build());
+        }
+    }
+
+    private class InternalFlowRuleProviderService
+            extends AbstractVirtualProviderService<VirtualFlowRuleProvider>
+            implements VirtualFlowRuleProviderService {
+
+        final Map<FlowEntry, Long> firstSeen = Maps.newConcurrentMap();
+        final Map<FlowEntry, Long> lastSeen = Maps.newConcurrentMap();
+
+        @Override
+        public void flowRemoved(FlowEntry flowEntry) {
+            checkNotNull(flowEntry, FLOW_RULE_NULL);
+            checkValidity();
+
+            lastSeen.remove(flowEntry);
+            firstSeen.remove(flowEntry);
+            FlowEntry stored = store.getFlowEntry(network.id(), flowEntry);
+            if (stored == null) {
+                log.debug("Rule already evicted from store: {}", flowEntry);
+                return;
+            }
+            if (flowEntry.reason() == FlowEntry.FlowRemoveReason.HARD_TIMEOUT) {
+                ((DefaultFlowEntry) stored).setState(FlowEntry.FlowEntryState.REMOVED);
+            }
+            Device device = deviceService.getDevice(flowEntry.deviceId());
+
+            //FIXME: obtains provider from devices providerId()
+            FlowRuleEvent event = null;
+            switch (stored.state()) {
+                case ADDED:
+                case PENDING_ADD:
+                    provider().applyFlowRule(network.id(), stored);
+                    break;
+                case PENDING_REMOVE:
+                case REMOVED:
+                    event = store.removeFlowRule(network.id(), stored);
+                    break;
+                default:
+                    break;
+
+            }
+            if (event != null) {
+                log.debug("Flow {} removed", flowEntry);
+                post(event);
+            }
+        }
+
+
+        private void flowMissing(FlowEntry flowRule) {
+            checkNotNull(flowRule, FLOW_RULE_NULL);
+            checkValidity();
+
+            FlowRuleEvent event = null;
+            switch (flowRule.state()) {
+                case PENDING_REMOVE:
+                case REMOVED:
+                    event = store.removeFlowRule(network.id(), flowRule);
+                    break;
+                case ADDED:
+                case PENDING_ADD:
+                    event = store.pendingFlowRule(network.id(), flowRule);
+                    try {
+                        provider().applyFlowRule(network.id(), flowRule);
+                    } catch (UnsupportedOperationException e) {
+                        log.warn(e.getMessage());
+                        if (flowRule instanceof DefaultFlowEntry) {
+                            //FIXME modification of "stored" flow entry outside of store
+                            ((DefaultFlowEntry) flowRule).setState(FlowEntry.FlowEntryState.FAILED);
+                        }
+                    }
+                    break;
+                default:
+                    log.debug("Flow {} has not been installed.", flowRule);
+            }
+
+            if (event != null) {
+                log.debug("Flow {} removed", flowRule);
+                post(event);
+            }
+        }
+
+        private void extraneousFlow(FlowRule flowRule) {
+            checkNotNull(flowRule, FLOW_RULE_NULL);
+            checkValidity();
+
+            provider().removeFlowRule(network.id(), flowRule);
+            log.debug("Flow {} is on switch but not in store.", flowRule);
+        }
+
+        private void flowAdded(FlowEntry flowEntry) {
+            checkNotNull(flowEntry, FLOW_RULE_NULL);
+
+            if (checkRuleLiveness(flowEntry, store.getFlowEntry(network.id(), flowEntry))) {
+                FlowRuleEvent event = store.addOrUpdateFlowRule(network.id(), flowEntry);
+                if (event == null) {
+                    log.debug("No flow store event generated.");
+                } else {
+                    log.trace("Flow {} {}", flowEntry, event.type());
+                    post(event);
+                }
+            } else {
+                log.debug("Removing flow rules....");
+                removeFlowRules(flowEntry);
+            }
+        }
+
+        private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
+            if (storedRule == null) {
+                return false;
+            }
+            if (storedRule.isPermanent()) {
+                return true;
+            }
+
+            final long timeout = storedRule.timeout() * 1000;
+            final long currentTime = System.currentTimeMillis();
+
+            // Checking flow with hardTimeout
+            if (storedRule.hardTimeout() != 0) {
+                if (!firstSeen.containsKey(storedRule)) {
+                    // First time rule adding
+                    firstSeen.put(storedRule, currentTime);
+                } else {
+                    Long first = firstSeen.get(storedRule);
+                    final long hardTimeout = storedRule.hardTimeout() * 1000;
+                    if ((currentTime - first) > hardTimeout) {
+                        return false;
+                    }
+                }
+            }
+
+            if (storedRule.packets() != swRule.packets()) {
+                lastSeen.put(storedRule, currentTime);
+                return true;
+            }
+            if (!lastSeen.containsKey(storedRule)) {
+                // checking for the first time
+                lastSeen.put(storedRule, storedRule.lastSeen());
+                // Use following if lastSeen attr. was removed.
+                //lastSeen.put(storedRule, currentTime);
+            }
+            Long last = lastSeen.get(storedRule);
+
+            // concurrently removed? let the liveness check fail
+            return last != null && (currentTime - last) <= timeout;
+        }
+
+        @Override
+        public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
+            pushFlowMetricsInternal(deviceId, flowEntries, true);
+        }
+
+        @Override
+        public void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
+            pushFlowMetricsInternal(deviceId, flowEntries, false);
+        }
+
+        private void pushFlowMetricsInternal(DeviceId deviceId, Iterable<FlowEntry> flowEntries,
+                                             boolean useMissingFlow) {
+            Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
+            store.getFlowEntries(network.id(), deviceId).forEach(f -> storedRules.put(f, f));
+
+            for (FlowEntry rule : flowEntries) {
+                try {
+                    FlowEntry storedRule = storedRules.remove(rule);
+                    if (storedRule != null) {
+                        if (storedRule.exactMatch(rule)) {
+                            // we both have the rule, let's update some info then.
+                            flowAdded(rule);
+                        } else {
+                            // the two rules are not an exact match - remove the
+                            // switch's rule and install our rule
+                            extraneousFlow(rule);
+                            flowMissing(storedRule);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.debug("Can't process added or extra rule {}", e.getMessage());
+                }
+            }
+
+            // DO NOT reinstall
+            if (useMissingFlow) {
+                for (FlowEntry rule : storedRules.keySet()) {
+                    try {
+                        // there are rules in the store that aren't on the switch
+                        log.debug("Adding rule in store, but not on switch {}", rule);
+                        flowMissing(rule);
+                    } catch (Exception e) {
+                        log.debug("Can't add missing flow rule:", e);
+                    }
+                }
+            }
+        }
+
+        public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
+            store.batchOperationComplete(network.id(), FlowRuleBatchEvent.completed(
+                    new FlowRuleBatchRequest(batchId, Collections.emptySet()),
+                    operation
+            ));
+        }
+
+        @Override
+        public void pushTableStatistics(DeviceId deviceId,
+                                        List<TableStatisticsEntry> tableStats) {
+            store.updateTableStatistics(network.id(), deviceId, tableStats);
+        }
     }
 }
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
index b40386d..f3c7281 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
@@ -38,14 +38,14 @@
 import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
 import org.onosproject.incubator.net.virtual.VirtualNetworkIntent;
 import org.onosproject.incubator.net.virtual.VirtualNetworkListener;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProvider;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProviderRegistry;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProviderService;
 import org.onosproject.incubator.net.virtual.VirtualNetworkService;
 import org.onosproject.incubator.net.virtual.VirtualNetworkStore;
 import org.onosproject.incubator.net.virtual.VirtualNetworkStoreDelegate;
 import org.onosproject.incubator.net.virtual.VirtualPort;
 import org.onosproject.incubator.net.virtual.VnetService;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderRegistry;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.HostId;
@@ -394,7 +394,8 @@
         } else if (serviceKey.serviceClass.equals(PathService.class)) {
             service = new VirtualNetworkPathManager(this, network);
         } else if (serviceKey.serviceClass.equals(FlowRuleService.class)) {
-            service = new VirtualNetworkFlowRuleManager(this, network);
+            service = new VirtualNetworkFlowRuleManager(this, network,
+                                                        new DefaultServiceDirectory());
         } else {
             return null;
         }
@@ -498,6 +499,7 @@
         return new InternalVirtualNetworkProviderService(provider);
     }
 
+
     /**
      * Service issued to registered virtual network providers so that they
      * can interact with the core.
@@ -599,5 +601,4 @@
             }
         }
     }
-
 }
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/provider/DefaultVirtualNetworkProvider.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/provider/DefaultVirtualNetworkProvider.java
index 3e42579..8f7c55e 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/provider/DefaultVirtualNetworkProvider.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/provider/DefaultVirtualNetworkProvider.java
@@ -25,9 +25,9 @@
 import org.onosproject.incubator.net.tunnel.TunnelId;
 import org.onosproject.incubator.net.virtual.DefaultVirtualLink;
 import org.onosproject.incubator.net.virtual.NetworkId;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProvider;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProviderRegistry;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProviderService;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderRegistry;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Link;
 import org.onosproject.net.Path;
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowRuleManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowRuleManagerTest.java
new file mode 100644
index 0000000..086e9b8
--- /dev/null
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkFlowRuleManagerTest.java
@@ -0,0 +1,517 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.osgi.TestServiceDirectory;
+import org.onlab.rest.BaseResource;
+import org.onosproject.TestApplicationId;
+import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.TenantId;
+import org.onosproject.incubator.net.virtual.VirtualDevice;
+import org.onosproject.incubator.net.virtual.VirtualLink;
+import org.onosproject.incubator.net.virtual.VirtualNetwork;
+import org.onosproject.incubator.net.virtual.VirtualNetworkFlowRuleStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkStore;
+import org.onosproject.incubator.net.virtual.impl.provider.VirtualProviderManager;
+import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualFlowRuleProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
+import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Link;
+import org.onosproject.net.NetTestTools;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.TestDeviceParams;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowId;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleBatchEntry;
+import org.onosproject.net.flow.FlowRuleBatchEvent;
+import org.onosproject.net.flow.FlowRuleBatchOperation;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.FlowRuleStoreDelegate;
+import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.TableStatisticsEntry;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.intent.FakeIntentManager;
+import org.onosproject.net.intent.TestableIntentService;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.TestStorageService;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.junit.Assert.*;
+
+public class VirtualNetworkFlowRuleManagerTest extends TestDeviceParams {
+    private static final int TIMEOUT = 10;
+
+    private VirtualNetworkManager manager;
+    private DistributedVirtualNetworkStore virtualNetworkManagerStore;
+    private TestableIntentService intentService = new FakeIntentManager();
+    private ServiceDirectory testDirectory;
+    private VirtualNetworkFlowRuleStore flowRuleStore;
+    private VirtualProviderRegistryService providerRegistryService;
+
+    private VirtualNetworkFlowRuleManager vnetFlowRuleService1;
+    private VirtualNetworkFlowRuleManager vnetFlowRuleService2;
+
+    private VirtualFlowRuleProvider provider = new TestProvider();
+
+    protected TestFlowRuleListener listener1 = new TestFlowRuleListener();
+    protected TestFlowRuleListener listener2 = new TestFlowRuleListener();
+
+    private final TenantId tid1 = TenantId.tenantId("tid1");
+    private final TenantId tid2 = TenantId.tenantId("tid2");
+
+    private VirtualNetwork vnet1;
+    private VirtualNetwork vnet2;
+
+    private ApplicationId appId;
+
+
+    @Before
+    public void setUp() throws Exception {
+        virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
+
+        CoreService coreService = new TestCoreService();
+        virtualNetworkManagerStore.setCoreService(coreService);
+        TestUtils.setField(virtualNetworkManagerStore, "storageService", new TestStorageService());
+        virtualNetworkManagerStore.activate();
+
+        flowRuleStore = new TestVirtualFlowRuleStore();
+
+        providerRegistryService = new VirtualProviderManager();
+        providerRegistryService.registerProvider(provider);
+
+        manager = new VirtualNetworkManager();
+        manager.store = virtualNetworkManagerStore;
+        manager.intentService = intentService;
+        TestUtils.setField(manager, "coreService", coreService);
+        NetTestTools.injectEventDispatcher(manager, new TestEventDispatcher());
+        manager.activate();
+
+        appId = new TestApplicationId("FlowRuleManagerTest");
+
+
+        testDirectory = new TestServiceDirectory()
+                .add(VirtualNetworkStore.class, virtualNetworkManagerStore)
+                .add(CoreService.class, coreService)
+                .add(VirtualProviderRegistryService.class, providerRegistryService)
+                .add(VirtualNetworkFlowRuleStore.class, flowRuleStore);
+
+        BaseResource.setServiceDirectory(testDirectory);
+
+        vnet1 = setupVirtualNetworkTopology(tid1);
+        vnet2 = setupVirtualNetworkTopology(tid2);
+
+        vnetFlowRuleService1 = new VirtualNetworkFlowRuleManager(manager, vnet1, testDirectory);
+        vnetFlowRuleService2 = new VirtualNetworkFlowRuleManager(manager, vnet2, testDirectory);
+        vnetFlowRuleService1.addListener(listener1);
+
+        vnetFlowRuleService1.operationsService = MoreExecutors.newDirectExecutorService();
+        vnetFlowRuleService2.operationsService = MoreExecutors.newDirectExecutorService();
+        vnetFlowRuleService1.deviceInstallers = MoreExecutors.newDirectExecutorService();
+        vnetFlowRuleService2.deviceInstallers = MoreExecutors.newDirectExecutorService();
+    }
+
+    @After
+    public void tearDown() {
+        manager.deactivate();
+        virtualNetworkManagerStore.deactivate();
+    }
+
+    /**
+     * Method to create the virtual network for further testing.
+     *
+     * @return virtual network
+     */
+    private VirtualNetwork setupVirtualNetworkTopology(TenantId tenantId) {
+        manager.registerTenantId(tenantId);
+        VirtualNetwork virtualNetwork = manager.createVirtualNetwork(tenantId);
+
+        VirtualDevice virtualDevice1 =
+                manager.createVirtualDevice(virtualNetwork.id(), DID1);
+        VirtualDevice virtualDevice2 =
+                manager.createVirtualDevice(virtualNetwork.id(), DID2);
+        VirtualDevice virtualDevice3 =
+                manager.createVirtualDevice(virtualNetwork.id(), DID3);
+        VirtualDevice virtualDevice4 =
+                manager.createVirtualDevice(virtualNetwork.id(), DID4);
+
+        ConnectPoint cp1 = new ConnectPoint(virtualDevice1.id(), PortNumber.portNumber(1));
+        manager.createVirtualPort(virtualNetwork.id(), cp1.deviceId(), cp1.port(), cp1);
+
+        ConnectPoint cp2 = new ConnectPoint(virtualDevice1.id(), PortNumber.portNumber(2));
+        manager.createVirtualPort(virtualNetwork.id(), cp2.deviceId(), cp2.port(), cp2);
+
+        ConnectPoint cp3 = new ConnectPoint(virtualDevice2.id(), PortNumber.portNumber(3));
+        manager.createVirtualPort(virtualNetwork.id(), cp3.deviceId(), cp3.port(), cp3);
+
+        ConnectPoint cp4 = new ConnectPoint(virtualDevice2.id(), PortNumber.portNumber(4));
+        manager.createVirtualPort(virtualNetwork.id(), cp4.deviceId(), cp4.port(), cp4);
+
+        ConnectPoint cp5 = new ConnectPoint(virtualDevice3.id(), PortNumber.portNumber(5));
+        manager.createVirtualPort(virtualNetwork.id(), cp5.deviceId(), cp5.port(), cp5);
+
+        ConnectPoint cp6 = new ConnectPoint(virtualDevice3.id(), PortNumber.portNumber(6));
+        manager.createVirtualPort(virtualNetwork.id(), cp6.deviceId(), cp6.port(), cp6);
+
+        VirtualLink link1 = manager.createVirtualLink(virtualNetwork.id(), cp1, cp3);
+        virtualNetworkManagerStore.updateLink(link1, link1.tunnelId(), Link.State.ACTIVE);
+        VirtualLink link2 = manager.createVirtualLink(virtualNetwork.id(), cp3, cp1);
+        virtualNetworkManagerStore.updateLink(link2, link2.tunnelId(), Link.State.ACTIVE);
+        VirtualLink link3 = manager.createVirtualLink(virtualNetwork.id(), cp4, cp5);
+        virtualNetworkManagerStore.updateLink(link3, link3.tunnelId(), Link.State.ACTIVE);
+        VirtualLink link4 = manager.createVirtualLink(virtualNetwork.id(), cp5, cp4);
+        virtualNetworkManagerStore.updateLink(link4, link4.tunnelId(), Link.State.ACTIVE);
+        VirtualLink link5 = manager.createVirtualLink(virtualNetwork.id(), cp2, cp6);
+        virtualNetworkManagerStore.updateLink(link5, link5.tunnelId(), Link.State.ACTIVE);
+        VirtualLink link6 = manager.createVirtualLink(virtualNetwork.id(), cp6, cp2);
+        virtualNetworkManagerStore.updateLink(link6, link6.tunnelId(), Link.State.ACTIVE);
+
+        return virtualNetwork;
+    }
+
+    private FlowRule flowRule(int tsval, int trval) {
+        return flowRule(DID1, tsval, trval);
+    }
+
+    private FlowRule flowRule(DeviceId did, int tsval, int trval) {
+        TestSelector ts = new TestSelector(tsval);
+        TestTreatment tr = new TestTreatment(trval);
+        return DefaultFlowRule.builder()
+                .forDevice(did)
+                .withSelector(ts)
+                .withTreatment(tr)
+                .withPriority(10)
+                .fromApp(appId)
+                .makeTemporary(TIMEOUT)
+                .build();
+    }
+
+    private FlowRule addFlowRule(int hval) {
+        FlowRule rule = flowRule(hval, hval);
+        vnetFlowRuleService1.applyFlowRules(rule);
+
+        assertNotNull("rule should be found", vnetFlowRuleService1.getFlowEntries(DID1));
+        return rule;
+    }
+
+    private int flowCount(FlowRuleService service) {
+        List<FlowEntry> entries = Lists.newArrayList();
+        service.getFlowEntries(DID1).forEach(entries::add);
+        return entries.size();
+    }
+
+    @Test
+    public void getFlowEntries() {
+        assertTrue("store should be empty",
+                   Sets.newHashSet(vnetFlowRuleService1.getFlowEntries(DID1)).isEmpty());
+        assertTrue("store should be empty",
+                   Sets.newHashSet(vnetFlowRuleService2.getFlowEntries(DID1)).isEmpty());
+        FlowRule f1 = addFlowRule(1);
+        FlowRule f2 = addFlowRule(2);
+
+        FlowEntry fe1 = new DefaultFlowEntry(f1);
+        FlowEntry fe2 = new DefaultFlowEntry(f2);
+
+        assertEquals("2 rules should exist", 2, flowCount(vnetFlowRuleService1));
+        assertEquals("0 rules should exist", 0, flowCount(vnetFlowRuleService2));
+    }
+
+    private class TestSelector implements TrafficSelector {
+
+        //for controlling hashcode uniqueness;
+        private final int testval;
+
+        public TestSelector(int val) {
+            testval = val;
+        }
+
+        @Override
+        public Set<Criterion> criteria() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Criterion getCriterion(
+                org.onosproject.net.flow.criteria.Criterion.Type type) {
+            return null;
+        }
+
+        @Override
+        public int hashCode() {
+            return testval;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o instanceof TestSelector) {
+                return this.testval == ((TestSelector) o).testval;
+            }
+            return false;
+        }
+
+    }
+
+    private class TestTreatment implements TrafficTreatment {
+
+        //for controlling hashcode uniqueness;
+        private final int testval;
+
+        public TestTreatment(int val) {
+            testval = val;
+        }
+
+        @Override
+        public List<Instruction> deferred() {
+            return null;
+        }
+
+        @Override
+        public List<Instruction> immediate() {
+            return null;
+        }
+
+        @Override
+        public List<Instruction> allInstructions() {
+            return null;
+        }
+
+        @Override
+        public Instructions.TableTypeTransition tableTransition() {
+            return null;
+        }
+
+        @Override
+        public boolean clearedDeferred() {
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return testval;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o instanceof TestTreatment) {
+                return this.testval == ((TestTreatment) o).testval;
+            }
+            return false;
+        }
+
+        @Override
+        public Instructions.MetadataInstruction writeMetadata() {
+            return null;
+        }
+
+        @Override
+        public Instructions.MeterInstruction metered() {
+            return null;
+        }
+    }
+
+    private class TestVirtualFlowRuleStore implements VirtualNetworkFlowRuleStore {
+
+        private final ConcurrentMap<NetworkId,
+                ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>>>
+                flowEntries = new ConcurrentHashMap<>();
+
+        @Override
+        public void setDelegate(FlowRuleStoreDelegate delegate) {
+
+        }
+
+        @Override
+        public void unsetDelegate(FlowRuleStoreDelegate delegate) {
+
+        }
+
+        @Override
+        public boolean hasDelegate() {
+            return false;
+        }
+
+        @Override
+        public int getFlowRuleCount(NetworkId networkId) {
+            return 0;
+        }
+
+        @Override
+        public FlowEntry getFlowEntry(NetworkId networkId, FlowRule rule) {
+            return null;
+        }
+
+        @Override
+        public Iterable<FlowEntry> getFlowEntries(NetworkId networkId, DeviceId deviceId) {
+            HashSet<FlowEntry> entries = Sets.newHashSet();
+
+            if (flowEntries.get(networkId) == null
+                    || flowEntries.get(networkId).get(deviceId) == null) {
+                return entries;
+            }
+
+            flowEntries.get(networkId).get(deviceId).values().forEach(e -> entries.addAll(e));
+
+            return entries;
+        }
+
+        @Override
+        public void storeFlowRule(NetworkId networkId, FlowRule rule) {
+            StoredFlowEntry entry = new DefaultFlowEntry(rule);
+            flowEntries.putIfAbsent(networkId, new ConcurrentHashMap<>());
+            flowEntries.get(networkId).putIfAbsent(rule.deviceId(), new ConcurrentHashMap<>());
+            flowEntries.get(networkId).get(rule.deviceId()).putIfAbsent(rule.id(), Lists.newArrayList());
+            flowEntries.get(networkId).get(rule.deviceId()).get(rule.id()).add(entry);
+        }
+
+        @Override
+        public void storeBatch(NetworkId networkId, FlowRuleBatchOperation batchOperation) {
+            for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
+                final FlowRule flowRule = entry.target();
+                if (entry.operator().equals(FlowRuleBatchEntry.FlowRuleOperation.ADD)) {
+                    storeFlowRule(networkId, flowRule);
+                } else if (entry.operator().equals(FlowRuleBatchEntry.FlowRuleOperation.REMOVE)) {
+                    deleteFlowRule(networkId, flowRule);
+                } else {
+                    throw new UnsupportedOperationException("Unsupported operation type");
+                }
+            }
+        }
+
+        @Override
+        public void batchOperationComplete(NetworkId networkId, FlowRuleBatchEvent event) {
+
+        }
+
+        @Override
+        public void deleteFlowRule(NetworkId networkId, FlowRule rule) {
+
+        }
+
+        @Override
+        public FlowRuleEvent addOrUpdateFlowRule(NetworkId networkId, FlowEntry rule) {
+            return null;
+        }
+
+        @Override
+        public FlowRuleEvent removeFlowRule(NetworkId networkId, FlowEntry rule) {
+            return null;
+        }
+
+        @Override
+        public FlowRuleEvent pendingFlowRule(NetworkId networkId, FlowEntry rule) {
+            return null;
+        }
+
+        @Override
+        public void purgeFlowRules(NetworkId networkId) {
+
+        }
+
+        @Override
+        public FlowRuleEvent updateTableStatistics(NetworkId networkId,
+                                                   DeviceId deviceId,
+                                                   List<TableStatisticsEntry> tableStats) {
+            return null;
+        }
+
+        @Override
+        public Iterable<TableStatisticsEntry>
+        getTableStatistics(NetworkId networkId, DeviceId deviceId) {
+            return null;
+        }
+    }
+
+    private void validateEvents(TestFlowRuleListener listener, FlowRuleEvent.Type... events) {
+        if (events == null) {
+            assertTrue("events generated", listener.events.isEmpty());
+        }
+
+        int i = 0;
+        System.err.println("events :" + listener.events);
+        for (FlowRuleEvent e : listener.events) {
+            assertEquals("unexpected event", events[i], e.type());
+            i++;
+        }
+
+        assertEquals("mispredicted number of events",
+                     events.length, listener.events.size());
+
+        listener.events.clear();
+    }
+
+    private class TestFlowRuleListener implements FlowRuleListener {
+
+        public final List<FlowRuleEvent> events = new ArrayList<>();
+
+        @Override
+        public void event(FlowRuleEvent event) {
+           events.add(event);
+        }
+    }
+
+    private class TestProvider extends AbstractVirtualProvider
+            implements VirtualFlowRuleProvider {
+
+        protected TestProvider() {
+            super(new ProviderId("test", "org.onosproject.virtual.testprovider"));
+        }
+
+        @Override
+        public void applyFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+        }
+
+        @Override
+        public void removeFlowRule(NetworkId networkId, FlowRule... flowRules) {
+
+        }
+
+        @Override
+        public void executeBatch(NetworkId networkId, FlowRuleBatchOperation batch) {
+
+        }
+    }
+}
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
index 98bad26..3a580eb 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManagerTest.java
@@ -38,7 +38,7 @@
 import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
 import org.onosproject.incubator.net.virtual.VirtualNetworkIntent;
 import org.onosproject.incubator.net.virtual.VirtualNetworkListener;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProviderService;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderService;
 import org.onosproject.incubator.net.virtual.VirtualPort;
 import org.onosproject.incubator.net.virtual.impl.provider.DefaultVirtualNetworkProvider;
 import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/provider/VirtualNetworkTopologyProviderTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/provider/VirtualNetworkTopologyProviderTest.java
index 882aef5..d23d132 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/provider/VirtualNetworkTopologyProviderTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/provider/VirtualNetworkTopologyProviderTest.java
@@ -31,9 +31,9 @@
 import org.onosproject.incubator.net.virtual.VirtualDevice;
 import org.onosproject.incubator.net.virtual.VirtualLink;
 import org.onosproject.incubator.net.virtual.VirtualNetwork;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProvider;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProviderRegistry;
-import org.onosproject.incubator.net.virtual.VirtualNetworkProviderService;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderRegistry;
+import org.onosproject.incubator.net.virtual.provider.VirtualNetworkProviderService;
 import org.onosproject.incubator.net.virtual.impl.VirtualNetworkManager;
 import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
 import org.onosproject.net.ConnectPoint;