[ONOS-5920] (vCore) Create Meter Manager and Store

Changes.
1. Virtual Meter Manager is added
2. Virtual Meter Store Interface is added
3. Simple meter store is implemented (WIP)
4. Unit tests for meter manager and store are added

Change-Id: I5c936617765a48bbdde417eab60270a1d16f9e77
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMeterStore.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMeterStore.java
new file mode 100644
index 0000000..954f66b
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/VirtualNetworkMeterStore.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterEvent;
+import org.onosproject.net.meter.MeterFailReason;
+import org.onosproject.net.meter.MeterFeatures;
+import org.onosproject.net.meter.MeterFeaturesKey;
+import org.onosproject.net.meter.MeterKey;
+import org.onosproject.net.meter.MeterOperation;
+import org.onosproject.net.meter.MeterStoreDelegate;
+import org.onosproject.net.meter.MeterStoreResult;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+public interface VirtualNetworkMeterStore
+        extends VirtualStore<MeterEvent, MeterStoreDelegate> {
+
+    /**
+     * Adds a meter to the store.
+     *
+     * @param networkId a virtual network identifier
+     * @param meter a meter
+     * @return a future indicating the result of the store operation
+     */
+    CompletableFuture<MeterStoreResult> storeMeter(NetworkId networkId, Meter meter);
+
+    /**
+     * Deletes a meter from the store.
+     *
+     * @param networkId a virtual network identifier
+     * @param meter a meter
+     * @return a future indicating the result of the store operation
+     */
+    CompletableFuture<MeterStoreResult> deleteMeter(NetworkId networkId, Meter meter);
+
+
+    /**
+     * Adds the meter features to the store.
+     *
+     * @param networkId a virtual network identifier
+     * @param meterfeatures the meter features
+     * @return the result of the store operation
+     */
+    MeterStoreResult storeMeterFeatures(NetworkId networkId, MeterFeatures meterfeatures);
+
+    /**
+     * Deletes the meter features from the store.
+     *
+     * @param networkId a virtual network identifier
+     * @param deviceId the device id
+     * @return a future indicating the result of the store operation
+     */
+    MeterStoreResult deleteMeterFeatures(NetworkId networkId, DeviceId deviceId);
+
+
+    /**
+     * Updates a meter whose meter id is the same as the passed meter.
+     *
+     * @param networkId a virtual network identifier
+     * @param meter a new meter
+     * @return a future indicating the result of the store operation
+     */
+    CompletableFuture<MeterStoreResult> updateMeter(NetworkId networkId, Meter meter);
+
+    /**
+     * Updates a given meter's state with the provided state.
+     *
+     * @param networkId a virtual network identifier
+     * @param meter a meter
+     */
+    void updateMeterState(NetworkId networkId, Meter meter);
+
+    /**
+     * Obtains a meter matching the given meter key.
+     *
+     * @param networkId a virtual network identifier
+     * @param key a meter key
+     * @return a meter
+     */
+    Meter getMeter(NetworkId networkId, MeterKey key);
+
+    /**
+     * Returns all meters stored in the store.
+     *
+     * @param networkId a virtual network identifier
+     * @return a collection of meters
+     */
+    Collection<Meter> getAllMeters(NetworkId networkId);
+
+    /**
+     * Update the store by deleting the failed meter.
+     * Notifies the delegate that the meter failed to allow it
+     * to nofity the app.
+     *
+     * @param networkId a virtual network identifier
+     * @param op a failed meter operation
+     * @param reason a failure reason
+     */
+    void failedMeter(NetworkId networkId, MeterOperation op, MeterFailReason reason);
+
+    /**
+     * Delete this meter immediately.
+     *
+     * @param networkId a virtual network identifier
+     * @param m a meter
+     */
+    void deleteMeterNow(NetworkId networkId, Meter m);
+
+    /**
+     * Retrieve maximum meters available for the device.
+     *
+     * @param networkId a virtual network identifier
+     * @param key the meter features key
+     * @return the maximum number of meters supported by the device
+     */
+    long getMaxMeters(NetworkId networkId, MeterFeaturesKey key);
+}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualMeterProviderService.java b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualMeterProviderService.java
index f50ad05..46e4a7d 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualMeterProviderService.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/virtual/provider/VirtualMeterProviderService.java
@@ -19,6 +19,7 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.meter.Meter;
 import org.onosproject.net.meter.MeterFailReason;
+import org.onosproject.net.meter.MeterFeatures;
 import org.onosproject.net.meter.MeterOperation;
 
 import java.util.Collection;
@@ -39,11 +40,27 @@
     void meterOperationFailed(MeterOperation operation, MeterFailReason reason);
 
     /**
-     * Pushes the collection of meters observed on the virtual data plane as
+     * Pushes the collection of meters observed on the data plane as
      * well as their associated statistics.
      *
      * @param deviceId a device id
      * @param meterEntries a collection of meter entries
      */
     void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries);
+
+    /**
+     * Pushes the meter features collected from the device.
+     *
+     * @param deviceId the device Id
+     * @param meterfeatures the meter features
+     */
+    void pushMeterFeatures(DeviceId deviceId,
+                           MeterFeatures meterfeatures);
+
+    /**
+     * Delete meter features collected from the device.
+     *
+     * @param deviceId the device id
+     */
+    void deleteMeterFeatures(DeviceId deviceId);
 }
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
index c7ccf52..20b6ceb 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkManager.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015-present Open Networking Foundation
+ * Copyright 2017-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -67,6 +67,7 @@
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.intent.IntentService;
 import org.onosproject.net.link.LinkService;
+import org.onosproject.net.meter.MeterService;
 import org.onosproject.net.packet.PacketService;
 import org.onosproject.net.provider.AbstractListenerProviderRegistry;
 import org.onosproject.net.provider.AbstractProviderService;
@@ -431,6 +432,8 @@
             service = new VirtualNetworkPacketManager(this, network.id());
         } else if (serviceKey.serviceClass.equals(GroupService.class)) {
             service = new VirtualNetworkGroupManager(this, network.id());
+        } else if (serviceKey.serviceClass.equals(MeterService.class)) {
+            service = new VirtualNetworkMeterManager(this, network.id());
         } else if (serviceKey.serviceClass.equals(FlowObjectiveService.class)) {
             service = new VirtualNetworkFlowObjectiveManager(this, network.id());
         } else if (serviceKey.serviceClass.equals(MastershipService.class) ||
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMeterManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMeterManager.java
new file mode 100644
index 0000000..d661719
--- /dev/null
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMeterManager.java
@@ -0,0 +1,293 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual.impl;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.TriConsumer;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkMeterStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkService;
+import org.onosproject.incubator.net.virtual.event.AbstractVirtualListenerManager;
+import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProviderService;
+import org.onosproject.incubator.net.virtual.provider.VirtualMeterProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualMeterProviderService;
+import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.meter.DefaultMeter;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterEvent;
+import org.onosproject.net.meter.MeterFailReason;
+import org.onosproject.net.meter.MeterFeatures;
+import org.onosproject.net.meter.MeterFeaturesKey;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
+import org.onosproject.net.meter.MeterListener;
+import org.onosproject.net.meter.MeterOperation;
+import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.net.meter.MeterService;
+import org.onosproject.net.meter.MeterState;
+import org.onosproject.net.meter.MeterStoreDelegate;
+import org.onosproject.net.meter.MeterStoreResult;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class VirtualNetworkMeterManager
+        extends AbstractVirtualListenerManager<MeterEvent, MeterListener>
+        implements MeterService {
+
+    private static final String METERCOUNTERIDENTIFIER = "meter-id-counter-%s";
+    private final Logger log = getLogger(getClass());
+
+    protected StorageService coreStorageService;
+
+    protected VirtualNetworkMeterStore store;
+    private final MeterStoreDelegate storeDelegate = new InternalMeterStoreDelegate();
+
+    private VirtualProviderRegistryService providerRegistryService;
+    private InternalMeterProviderService innerProviderService;
+
+    private Map<DeviceId, AtomicCounter> meterIdCounters
+            = Maps.newConcurrentMap();
+
+    private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
+
+    /**
+     * Creates a new VirtualNetworkMeterManager object.
+     *
+     * @param manager virtual network manager service
+     * @param networkId a virtual network identifier
+     */
+    public VirtualNetworkMeterManager(VirtualNetworkService manager,
+                                      NetworkId networkId) {
+        super(manager, networkId, MeterEvent.class);
+
+        coreStorageService = serviceDirectory.get(StorageService.class);
+        providerRegistryService =
+                serviceDirectory.get(VirtualProviderRegistryService.class);
+
+        store = serviceDirectory.get(VirtualNetworkMeterStore.class);
+        store.setDelegate(networkId, this.storeDelegate);
+
+        innerProviderService = new InternalMeterProviderService();
+        providerRegistryService.registerProviderService(networkId(), innerProviderService);
+
+
+        onComplete = (request, result, error) -> {
+            request.context().ifPresent(c -> {
+                if (error != null) {
+                    c.onError(request, MeterFailReason.UNKNOWN);
+                } else {
+                    if (result.reason().isPresent()) {
+                        c.onError(request, result.reason().get());
+                    } else {
+                        c.onSuccess(request);
+                    }
+                }
+            });
+
+        };
+
+        log.info("Started");
+    }
+
+    @Override
+    public Meter submit(MeterRequest request) {
+
+        MeterId id = allocateMeterId(request.deviceId());
+
+        Meter.Builder mBuilder = DefaultMeter.builder()
+                .forDevice(request.deviceId())
+                .fromApp(request.appId())
+                .withBands(request.bands())
+                .withId(id)
+                .withUnit(request.unit());
+
+        if (request.isBurst()) {
+            mBuilder.burst();
+        }
+        DefaultMeter m = (DefaultMeter) mBuilder.build();
+        m.setState(MeterState.PENDING_ADD);
+        store.storeMeter(networkId(), m).whenComplete((result, error) ->
+                                                 onComplete.accept(request, result, error));
+        return m;
+    }
+
+    @Override
+    public void withdraw(MeterRequest request, MeterId meterId) {
+        Meter.Builder mBuilder = DefaultMeter.builder()
+                .forDevice(request.deviceId())
+                .fromApp(request.appId())
+                .withBands(request.bands())
+                .withId(meterId)
+                .withUnit(request.unit());
+
+        if (request.isBurst()) {
+            mBuilder.burst();
+        }
+
+        DefaultMeter m = (DefaultMeter) mBuilder.build();
+        m.setState(MeterState.PENDING_REMOVE);
+        store.deleteMeter(networkId(), m).whenComplete((result, error) ->
+                                                  onComplete.accept(request, result, error));
+    }
+
+    @Override
+    public Meter getMeter(DeviceId deviceId, MeterId id) {
+        MeterKey key = MeterKey.key(deviceId, id);
+        return store.getMeter(networkId(), key);
+    }
+
+    @Override
+    public Collection<Meter> getMeters(DeviceId deviceId) {
+        return store.getAllMeters(networkId()).stream()
+                .filter(m -> m.deviceId().equals(deviceId)).collect(Collectors.toList());
+    }
+
+    @Override
+    public Collection<Meter> getAllMeters() {
+        return store.getAllMeters(networkId());
+    }
+
+    private long queryMeters(DeviceId device) {
+        //FIXME: how to decide maximum number of meters per virtual device?
+        return 1;
+    }
+
+    private AtomicCounter allocateCounter(DeviceId deviceId) {
+        return coreStorageService
+                .getAtomicCounter(String.format(METERCOUNTERIDENTIFIER, deviceId));
+    }
+
+    private MeterId allocateMeterId(DeviceId deviceId) {
+        long maxMeters = store.getMaxMeters(networkId(), MeterFeaturesKey.key(deviceId));
+        if (maxMeters == 0L) {
+            // MeterFeatures couldn't be retrieved, trying with queryMeters
+            maxMeters = queryMeters(deviceId);
+        }
+
+        if (maxMeters == 0L) {
+            throw new IllegalStateException("Meters not supported by device " + deviceId);
+        }
+
+        final long mmeters = maxMeters;
+        long id = meterIdCounters.compute(deviceId, (k, v) -> {
+            if (v == null) {
+                return allocateCounter(k);
+            }
+            if (v.get() >= mmeters) {
+                throw new IllegalStateException("Maximum number of meters " +
+                                                        meterIdCounters.get(deviceId).get() +
+                                                        " reached for device " + deviceId +
+                                                        " virtual network " + networkId());
+            }
+            return v;
+        }).incrementAndGet();
+
+        return MeterId.meterId(id);
+    }
+
+    private class InternalMeterProviderService
+            extends AbstractVirtualProviderService<VirtualMeterProvider>
+            implements VirtualMeterProviderService {
+
+        /**
+         * Creates a provider service on behalf of the specified provider.
+         */
+        protected InternalMeterProviderService() {
+            Set<ProviderId> providerIds =
+                    providerRegistryService.getProvidersByService(this);
+            ProviderId providerId = providerIds.stream().findFirst().get();
+            VirtualMeterProvider provider = (VirtualMeterProvider)
+                    providerRegistryService.getProvider(providerId);
+            setProvider(provider);
+        }
+
+        @Override
+        public void meterOperationFailed(MeterOperation operation,
+                                         MeterFailReason reason) {
+            store.failedMeter(networkId(), operation, reason);
+        }
+
+        @Override
+        public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
+            //FIXME: FOLLOWING CODE CANNOT BE TESTED UNTIL SOMETHING THAT
+            //FIXME: IMPLEMENTS METERS EXISTS
+            Map<Pair<DeviceId, MeterId>, Meter> storedMeterMap =
+                    store.getAllMeters(networkId()).stream()
+                    .collect(Collectors.toMap(m -> Pair.of(m.deviceId(), m.id()), Function.identity()));
+
+            meterEntries.stream()
+                    .filter(m -> storedMeterMap.remove(Pair.of(m.deviceId(), m.id())) != null)
+                    .forEach(m -> store.updateMeterState(networkId(), m));
+
+            storedMeterMap.values().forEach(m -> {
+                if (m.state() == MeterState.PENDING_ADD) {
+                    provider().performMeterOperation(networkId(), m.deviceId(),
+                                                     new MeterOperation(m,
+                                                                        MeterOperation.Type.MODIFY));
+                } else if (m.state() == MeterState.PENDING_REMOVE) {
+                    store.deleteMeterNow(networkId(), m);
+                }
+            });
+        }
+
+        @Override
+        public void pushMeterFeatures(DeviceId deviceId, MeterFeatures meterfeatures) {
+            store.storeMeterFeatures(networkId(), meterfeatures);
+        }
+
+        @Override
+        public void deleteMeterFeatures(DeviceId deviceId) {
+            store.deleteMeterFeatures(networkId(), deviceId);
+        }
+    }
+
+    private class InternalMeterStoreDelegate implements MeterStoreDelegate {
+
+        @Override
+        public void notify(MeterEvent event) {
+            DeviceId deviceId = event.subject().deviceId();
+            VirtualMeterProvider p = innerProviderService.provider();
+
+            switch (event.type()) {
+                case METER_ADD_REQ:
+                    p.performMeterOperation(networkId(), deviceId,
+                                            new MeterOperation(event.subject(),
+                                                               MeterOperation.Type.ADD));
+                    break;
+                case METER_REM_REQ:
+                    p.performMeterOperation(networkId(), deviceId,
+                                            new MeterOperation(event.subject(),
+                                                               MeterOperation.Type.REMOVE));
+                    break;
+                default:
+                    log.warn("Unknown meter event {}", event.type());
+            }
+        }
+    }
+}
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMeterManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMeterManagerTest.java
new file mode 100644
index 0000000..71c4f90
--- /dev/null
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/virtual/impl/VirtualNetworkMeterManagerTest.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.net.virtual.impl;
+
+import com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.osgi.ServiceDirectory;
+import org.onlab.osgi.TestServiceDirectory;
+import org.onlab.packet.IpAddress;
+import org.onosproject.TestApplicationId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetwork;
+import org.onosproject.incubator.net.virtual.VirtualNetworkMeterStore;
+import org.onosproject.incubator.net.virtual.VirtualNetworkStore;
+import org.onosproject.incubator.net.virtual.event.VirtualListenerRegistryManager;
+import org.onosproject.incubator.net.virtual.impl.provider.VirtualProviderManager;
+import org.onosproject.incubator.net.virtual.provider.AbstractVirtualProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualMeterProvider;
+import org.onosproject.incubator.net.virtual.provider.VirtualMeterProviderService;
+import org.onosproject.incubator.net.virtual.provider.VirtualProviderRegistryService;
+import org.onosproject.incubator.store.virtual.impl.DistributedVirtualNetworkStore;
+import org.onosproject.incubator.store.virtual.impl.SimpleVirtualMeterStore;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.NetTestTools;
+import org.onosproject.net.intent.FakeIntentManager;
+import org.onosproject.net.intent.TestableIntentService;
+import org.onosproject.net.meter.Band;
+import org.onosproject.net.meter.DefaultBand;
+import org.onosproject.net.meter.DefaultMeter;
+import org.onosproject.net.meter.DefaultMeterFeatures;
+import org.onosproject.net.meter.DefaultMeterRequest;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterFeaturesKey;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterOperation;
+import org.onosproject.net.meter.MeterOperations;
+import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.net.meter.MeterState;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.TestStorageService;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.*;
+
+/**
+ * Virtual Network meter manager tests.
+ */
+public class VirtualNetworkMeterManagerTest extends VirtualNetworkTestUtil {
+    private static final ProviderId PID = new ProviderId("of", "foo");
+    private static final NodeId NID_LOCAL = new NodeId("local");
+    private static final IpAddress LOCALHOST = IpAddress.valueOf("127.0.0.1");
+
+    private VirtualNetworkManager manager;
+    private DistributedVirtualNetworkStore virtualNetworkManagerStore;
+    private TestableIntentService intentService = new FakeIntentManager();
+    private ServiceDirectory testDirectory;
+    private VirtualProviderManager providerRegistryService;
+
+    private EventDeliveryService eventDeliveryService;
+    VirtualListenerRegistryManager listenerRegistryManager =
+            VirtualListenerRegistryManager.getInstance();
+
+    private VirtualNetwork vnet1;
+    private VirtualNetwork vnet2;
+
+    private SimpleVirtualMeterStore meterStore;
+
+    private VirtualNetworkMeterManager meterManager1;
+    private VirtualNetworkMeterManager meterManager2;
+
+    private TestProvider provider = new TestProvider();
+    private VirtualMeterProviderService providerService1;
+    private VirtualMeterProviderService providerService2;
+
+    private ApplicationId appId;
+
+    private Meter m1;
+    private Meter m2;
+    private MeterRequest.Builder m1Request;
+    private MeterRequest.Builder m2Request;
+
+    private Map<MeterId, Meter> meters = Maps.newHashMap();
+
+    @Before
+    public void setUp() throws Exception {
+        virtualNetworkManagerStore = new DistributedVirtualNetworkStore();
+        CoreService coreService = new TestCoreService();
+        TestStorageService storageService = new TestStorageService();
+        TestUtils.setField(virtualNetworkManagerStore, "coreService", coreService);
+        TestUtils.setField(virtualNetworkManagerStore, "storageService", storageService);
+        virtualNetworkManagerStore.activate();
+
+        meterStore = new SimpleVirtualMeterStore();
+
+        providerRegistryService = new VirtualProviderManager();
+        providerRegistryService.registerProvider(provider);
+
+        manager = new VirtualNetworkManager();
+        manager.store = virtualNetworkManagerStore;
+        TestUtils.setField(manager, "coreService", coreService);
+
+        eventDeliveryService = new TestEventDispatcher();
+        NetTestTools.injectEventDispatcher(manager, eventDeliveryService);
+//        eventDeliveryService.addSink(VirtualEvent.class, listenerRegistryManager);
+
+        appId = new TestApplicationId("MeterManagerTest");
+
+        testDirectory = new TestServiceDirectory()
+                .add(VirtualNetworkStore.class, virtualNetworkManagerStore)
+                .add(CoreService.class, coreService)
+                .add(VirtualProviderRegistryService.class, providerRegistryService)
+                .add(EventDeliveryService.class, eventDeliveryService)
+                .add(StorageService.class, storageService)
+                .add(VirtualNetworkMeterStore.class, meterStore);
+        TestUtils.setField(manager, "serviceDirectory", testDirectory);
+
+        manager.activate();
+
+        vnet1 = setupVirtualNetworkTopology(manager, TID1);
+        vnet2 = setupVirtualNetworkTopology(manager, TID2);
+
+        meterManager1 = new VirtualNetworkMeterManager(manager, vnet1.id());
+        meterManager2 = new VirtualNetworkMeterManager(manager, vnet2.id());
+
+        providerService1 = (VirtualMeterProviderService)
+                providerRegistryService.getProviderService(vnet1.id(), VirtualMeterProvider.class);
+        providerService2 = (VirtualMeterProviderService)
+                providerRegistryService.getProviderService(vnet2.id(), VirtualMeterProvider.class);
+
+        assertTrue("provider should be registered",
+                   providerRegistryService.getProviders().contains(provider.id()));
+
+        setupMeterTestVariables();
+    }
+
+    @After
+    public void tearDown() {
+        providerRegistryService.unregisterProvider(provider);
+        assertFalse("provider should not be registered",
+                    providerRegistryService.getProviders().contains(provider.id()));
+
+        manager.deactivate();
+        NetTestTools.injectEventDispatcher(manager, null);
+
+        virtualNetworkManagerStore.deactivate();
+    }
+
+    /** Test for meter submit(). */
+    @Test
+    public void testAddition() {
+        meterManager1.submit(m1Request.add());
+
+        assertTrue("The meter was not added",
+                   meterManager1.getAllMeters().size() == 1);
+        assertThat(meterManager1.getMeter(VDID1, MeterId.meterId(1)), is(m1));
+
+        assertTrue("The meter shouldn't be added for vnet2",
+                   meterManager2.getAllMeters().size() == 0);
+    }
+
+    /** Test for meter remove(). */
+    @Test
+    public void testRemove() {
+        meterManager1.submit(m1Request.add());
+        meterManager1.withdraw(m1Request.remove(), m1.id());
+
+        assertThat(meterManager1.getMeter(VDID1, MeterId.meterId(1)).state(),
+                   is(MeterState.PENDING_REMOVE));
+
+        providerService1.pushMeterMetrics(m1.deviceId(), Collections.emptyList());
+
+        assertTrue("The meter was not removed", meterManager1.getAllMeters().size() == 0);
+        assertTrue("The meter shouldn't be added for vnet2",
+                   meterManager2.getAllMeters().size() == 0);
+    }
+
+    /** Test for meter submit with multiple devices. */
+    @Test
+    public void testMultipleDevice() {
+        meterManager1.submit(m1Request.add());
+        meterManager1.submit(m2Request.add());
+
+        assertTrue("The meters were not added",
+                   meterManager1.getAllMeters().size() == 2);
+        assertTrue("The meter shouldn't be added for vnet2",
+                   meterManager2.getAllMeters().size() == 0);
+
+        assertThat(meterManager1.getMeter(VDID1, MeterId.meterId(1)), is(m1));
+        assertThat(meterManager1.getMeter(VDID2, MeterId.meterId(1)), is(m2));
+    }
+
+    /** Test for meter features inside store. */
+    @Test
+    public void testMeterFeatures() {
+        //Test for virtual network 1
+        assertEquals(meterStore.getMaxMeters(vnet1.id(),
+                                             MeterFeaturesKey.key(VDID1)), 255L);
+        assertEquals(meterStore.getMaxMeters(vnet1.id(),
+                                             MeterFeaturesKey.key(VDID2)), 2);
+        //Test for virtual network 2
+        assertEquals(meterStore.getMaxMeters(vnet2.id(),
+                                             MeterFeaturesKey.key(VDID1)), 100);
+        assertEquals(meterStore.getMaxMeters(vnet2.id(),
+                                             MeterFeaturesKey.key(VDID2)), 10);
+    }
+
+    /** Set variables such as meters and request required for testing. */
+    private void setupMeterTestVariables() {
+        Band band = DefaultBand.builder()
+                .ofType(Band.Type.DROP)
+                .withRate(500)
+                .build();
+
+        m1 = DefaultMeter.builder()
+                .forDevice(VDID1)
+                .fromApp(appId)
+                .withId(MeterId.meterId(1))
+                .withUnit(Meter.Unit.KB_PER_SEC)
+                .withBands(Collections.singletonList(band))
+                .build();
+
+        m2 = DefaultMeter.builder()
+                .forDevice(VDID2)
+                .fromApp(appId)
+                .withId(MeterId.meterId(1))
+                .withUnit(Meter.Unit.KB_PER_SEC)
+                .withBands(Collections.singletonList(band))
+                .build();
+
+        m1Request = DefaultMeterRequest.builder()
+                .forDevice(VDID1)
+                .fromApp(appId)
+                .withUnit(Meter.Unit.KB_PER_SEC)
+                .withBands(Collections.singletonList(band));
+
+        m2Request = DefaultMeterRequest.builder()
+                .forDevice(VDID2)
+                .fromApp(appId)
+                .withUnit(Meter.Unit.KB_PER_SEC)
+                .withBands(Collections.singletonList(band));
+
+        meterStore.storeMeterFeatures(vnet1.id(),
+                                      DefaultMeterFeatures.builder().forDevice(VDID1)
+                                              .withMaxMeters(255L)
+                                              .withBandTypes(new HashSet<>())
+                                              .withUnits(new HashSet<>())
+                                              .hasStats(false)
+                                              .hasBurst(false)
+                                              .withMaxBands((byte) 0)
+                                              .withMaxColors((byte) 0)
+                                              .build());
+        meterStore.storeMeterFeatures(vnet1.id(),
+                                      DefaultMeterFeatures.builder().forDevice(VDID2)
+                                              .withMaxMeters(2)
+                                              .withBandTypes(new HashSet<>())
+                                              .withUnits(new HashSet<>())
+                                              .hasBurst(false)
+                                              .hasStats(false)
+                                              .withMaxBands((byte) 0)
+                                              .withMaxColors((byte) 0)
+                                              .build());
+
+        meterStore.storeMeterFeatures(vnet2.id(),
+                                      DefaultMeterFeatures.builder().forDevice(VDID1)
+                                              .withMaxMeters(100L)
+                                              .withBandTypes(new HashSet<>())
+                                              .withUnits(new HashSet<>())
+                                              .hasStats(false)
+                                              .hasBurst(false)
+                                              .withMaxBands((byte) 0)
+                                              .withMaxColors((byte) 0)
+                                              .build());
+        meterStore.storeMeterFeatures(vnet2.id(),
+                                      DefaultMeterFeatures.builder().forDevice(VDID2)
+                                              .withMaxMeters(10)
+                                              .withBandTypes(new HashSet<>())
+                                              .withUnits(new HashSet<>())
+                                              .hasBurst(false)
+                                              .hasStats(false)
+                                              .withMaxBands((byte) 0)
+                                              .withMaxColors((byte) 0)
+                                              .build());
+    }
+
+    private class TestProvider
+            extends AbstractVirtualProvider
+            implements VirtualMeterProvider {
+
+        protected TestProvider() {
+            super(PID);
+        }
+
+        @Override
+        public void performMeterOperation(NetworkId networkId, DeviceId deviceId,
+                                          MeterOperations meterOps) {
+
+        }
+
+        @Override
+        public void performMeterOperation(NetworkId networkId, DeviceId deviceId,
+                                          MeterOperation meterOp) {
+            meters.put(meterOp.meter().id(), meterOp.meter());
+        }
+    }
+}
\ No newline at end of file
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMeterStore.java
new file mode 100644
index 0000000..3b5d122
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMeterStore.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.virtual.impl;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkMeterStore;
+import org.onosproject.incubator.store.meter.impl.MeterData;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.meter.DefaultMeter;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterEvent;
+import org.onosproject.net.meter.MeterFailReason;
+import org.onosproject.net.meter.MeterFeatures;
+import org.onosproject.net.meter.MeterFeaturesKey;
+import org.onosproject.net.meter.MeterKey;
+import org.onosproject.net.meter.MeterOperation;
+import org.onosproject.net.meter.MeterStoreDelegate;
+import org.onosproject.net.meter.MeterStoreResult;
+import org.onosproject.store.service.StorageException;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of the virtual meter store for a single instance.
+ */
+//TODO: support distributed meter store for virtual networks
+
+public class SimpleVirtualMeterStore
+        extends AbstractVirtualStore<MeterEvent, MeterStoreDelegate>
+        implements VirtualNetworkMeterStore {
+
+        private Logger log = getLogger(getClass());
+
+        @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+        protected ClusterService clusterService;
+
+        private ConcurrentMap<NetworkId, ConcurrentMap<MeterKey, MeterData>> meterMap =
+                Maps.newConcurrentMap();
+
+        private NodeId local;
+
+        private ConcurrentMap<NetworkId, ConcurrentMap<MeterFeaturesKey, MeterFeatures>>
+                meterFeatureMap = Maps.newConcurrentMap();
+
+        private ConcurrentMap<NetworkId,
+                ConcurrentMap<MeterKey, CompletableFuture<MeterStoreResult>>> futuresMap =
+                Maps.newConcurrentMap();
+
+        @Activate
+        public void activate() {
+            log.info("Started");
+            local = clusterService.getLocalNode().id();
+        }
+
+        @Deactivate
+        public void deactivate() {
+            log.info("Stopped");
+        }
+
+        private ConcurrentMap<MeterKey, MeterData> getMetersByNetwork(NetworkId networkId) {
+            meterMap.computeIfAbsent(networkId, m -> new ConcurrentHashMap<>());
+            return meterMap.get(networkId);
+        }
+
+        private ConcurrentMap<MeterFeaturesKey, MeterFeatures>
+        getMeterFeaturesByNetwork(NetworkId networkId) {
+            meterFeatureMap.computeIfAbsent(networkId, f -> new ConcurrentHashMap<>());
+            return meterFeatureMap.get(networkId);
+        }
+
+        private ConcurrentMap<MeterKey, CompletableFuture<MeterStoreResult>>
+        getFuturesByNetwork(NetworkId networkId) {
+            futuresMap.computeIfAbsent(networkId, f -> new ConcurrentHashMap<>());
+            return futuresMap.get(networkId);
+        }
+
+        @Override
+        public CompletableFuture<MeterStoreResult> storeMeter(NetworkId networkId, Meter meter) {
+
+            ConcurrentMap<MeterKey, MeterData> meters = getMetersByNetwork(networkId);
+
+            ConcurrentMap<MeterKey, CompletableFuture<MeterStoreResult>> futures =
+                   getFuturesByNetwork(networkId);
+
+            CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+            MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+            futures.put(key, future);
+            MeterData data = new MeterData(meter, null, local);
+
+            try {
+                    meters.put(key, data);
+            } catch (StorageException e) {
+                    future.completeExceptionally(e);
+            }
+
+            return future;
+        }
+
+        @Override
+        public CompletableFuture<MeterStoreResult> deleteMeter(NetworkId networkId, Meter meter) {
+            ConcurrentMap<MeterKey, MeterData> meters = getMetersByNetwork(networkId);
+
+            ConcurrentMap<MeterKey, CompletableFuture<MeterStoreResult>> futures =
+                    getFuturesByNetwork(networkId);
+
+            CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+            MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+            futures.put(key, future);
+
+            MeterData data = new MeterData(meter, null, local);
+
+            // update the state of the meter. It will be pruned by observing
+            // that it has been removed from the dataplane.
+            try {
+                    if (meters.computeIfPresent(key, (k, v) -> data) == null) {
+                            future.complete(MeterStoreResult.success());
+                    }
+            } catch (StorageException e) {
+                    future.completeExceptionally(e);
+            }
+
+            return future;
+        }
+
+        @Override
+        public MeterStoreResult storeMeterFeatures(NetworkId networkId, MeterFeatures meterfeatures) {
+            ConcurrentMap<MeterFeaturesKey, MeterFeatures> meterFeatures
+                    = getMeterFeaturesByNetwork(networkId);
+
+            MeterStoreResult result = MeterStoreResult.success();
+            MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
+            try {
+                    meterFeatures.putIfAbsent(key, meterfeatures);
+            } catch (StorageException e) {
+                    result = MeterStoreResult.fail(TIMEOUT);
+            }
+            return result;
+        }
+
+        @Override
+        public MeterStoreResult deleteMeterFeatures(NetworkId networkId, DeviceId deviceId) {
+            ConcurrentMap<MeterFeaturesKey, MeterFeatures> meterFeatures
+                    = getMeterFeaturesByNetwork(networkId);
+
+            MeterStoreResult result = MeterStoreResult.success();
+            MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
+            try {
+                    meterFeatures.remove(key);
+            } catch (StorageException e) {
+                    result = MeterStoreResult.fail(TIMEOUT);
+            }
+            return result;
+        }
+
+        @Override
+        public CompletableFuture<MeterStoreResult> updateMeter(NetworkId networkId, Meter meter) {
+            ConcurrentMap<MeterKey, MeterData> meters = getMetersByNetwork(networkId);
+            ConcurrentMap<MeterKey, CompletableFuture<MeterStoreResult>> futures =
+                    getFuturesByNetwork(networkId);
+
+            CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
+            MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+            futures.put(key, future);
+
+            MeterData data = new MeterData(meter, null, local);
+            try {
+                    if (meters.computeIfPresent(key, (k, v) -> data) == null) {
+                            future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
+                    }
+            } catch (StorageException e) {
+                    future.completeExceptionally(e);
+            }
+            return future;
+        }
+
+        @Override
+        public void updateMeterState(NetworkId networkId, Meter meter) {
+            ConcurrentMap<MeterKey, MeterData> meters = getMetersByNetwork(networkId);
+
+            MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+            meters.computeIfPresent(key, (k, v) -> {
+                    DefaultMeter m = (DefaultMeter) v.meter();
+                    m.setState(meter.state());
+                    m.setProcessedPackets(meter.packetsSeen());
+                    m.setProcessedBytes(meter.bytesSeen());
+                    m.setLife(meter.life());
+                    // TODO: Prune if drops to zero.
+                    m.setReferenceCount(meter.referenceCount());
+                    return new MeterData(m, null, v.origin());
+            });
+        }
+
+        @Override
+        public Meter getMeter(NetworkId networkId, MeterKey key) {
+            ConcurrentMap<MeterKey, MeterData> meters = getMetersByNetwork(networkId);
+
+            MeterData data = meters.get(key);
+            return data == null ? null : data.meter();
+        }
+
+        @Override
+        public Collection<Meter> getAllMeters(NetworkId networkId) {
+            ConcurrentMap<MeterKey, MeterData> meters = getMetersByNetwork(networkId);
+
+            return Collections2.transform(meters.values(), MeterData::meter);
+        }
+
+        @Override
+        public void failedMeter(NetworkId networkId, MeterOperation op, MeterFailReason reason) {
+            ConcurrentMap<MeterKey, MeterData> meters = getMetersByNetwork(networkId);
+
+            MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
+            meters.computeIfPresent(key, (k, v) ->
+                    new MeterData(v.meter(), reason, v.origin()));
+        }
+
+        @Override
+        public void deleteMeterNow(NetworkId networkId, Meter m) {
+            ConcurrentMap<MeterKey, MeterData> meters = getMetersByNetwork(networkId);
+            ConcurrentMap<MeterKey, CompletableFuture<MeterStoreResult>> futures =
+                    getFuturesByNetwork(networkId);
+
+            MeterKey key = MeterKey.key(m.deviceId(), m.id());
+            futures.remove(key);
+            meters.remove(key);
+        }
+
+        @Override
+        public long getMaxMeters(NetworkId networkId, MeterFeaturesKey key) {
+            ConcurrentMap<MeterFeaturesKey, MeterFeatures> meterFeatures
+                    = getMeterFeaturesByNetwork(networkId);
+
+            MeterFeatures features = meterFeatures.get(key);
+            return features == null ? 0L : features.maxMeter();
+        }
+}