adding device specific counters for meter ids in
the meter service.

Change-Id: I38d38a0a85024927f5a74013b2b4d9efa9b32d22
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
index 575a715..8c86e01 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.incubator.net.meter.impl;
 
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -27,6 +28,7 @@
 import org.onosproject.net.meter.MeterEvent;
 import org.onosproject.net.meter.MeterFailReason;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
 import org.onosproject.net.meter.MeterListener;
 import org.onosproject.net.meter.MeterOperation;
 import org.onosproject.net.meter.MeterProvider;
@@ -61,7 +63,7 @@
         MeterProvider, MeterProviderService>
         implements MeterService, MeterProviderRegistry {
 
-    private final String meterIdentifier = "meter-id-counter";
+    private static final String METERCOUNTERIDENTIFIER = "meter-id-counter-%s";
     private final Logger log = getLogger(getClass());
     private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
 
@@ -71,15 +73,16 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MeterStore store;
 
-    private AtomicCounter meterIdCounter;
+    private Map<DeviceId, AtomicCounter> meterIdCounters
+            = Maps.newConcurrentMap();
 
     private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
 
     @Activate
     public void activate() {
-        meterIdCounter = storageService.atomicCounterBuilder()
-                .withName(meterIdentifier)
-                .build();
+        /*meterIdCounter = storageService.atomicCounterBuilder()
+                .withName(METERCOUNTERIDENTIFIER)
+                .build();*/
 
         store.setDelegate(delegate);
 
@@ -115,11 +118,13 @@
     @Override
     public Meter submit(MeterRequest request) {
 
+        MeterId id = allocateMeterId(request.deviceId());
+
         Meter.Builder mBuilder = DefaultMeter.builder()
                 .forDevice(request.deviceId())
                 .fromApp(request.appId())
                 .withBands(request.bands())
-                .withId(allocateMeterId())
+                .withId(id)
                 .withUnit(request.unit());
 
         if (request.isBurst()) {
@@ -152,8 +157,9 @@
     }
 
     @Override
-    public Meter getMeter(MeterId id) {
-        return store.getMeter(id);
+    public Meter getMeter(DeviceId deviceId, MeterId id) {
+        MeterKey key = MeterKey.key(deviceId, id);
+        return store.getMeter(key);
     }
 
     @Override
@@ -161,9 +167,21 @@
         return store.getAllMeters();
     }
 
-    private MeterId allocateMeterId() {
-        // FIXME: This will break one day.
-        return MeterId.meterId((int) meterIdCounter.incrementAndGet());
+    private MeterId allocateMeterId(DeviceId deviceId) {
+        long id = meterIdCounters.compute(deviceId, (k, v) -> {
+            if (v == null) {
+                return allocateCounter(k);
+            }
+            return v;
+        }).incrementAndGet();
+
+        return MeterId.meterId((int) id);
+    }
+
+    private AtomicCounter allocateCounter(DeviceId deviceId) {
+        return storageService.atomicCounterBuilder()
+                .withName(String.format(METERCOUNTERIDENTIFIER, deviceId))
+                .build();
     }
 
     private class InternalMeterProviderService
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/meter/impl/MeterManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/meter/impl/MeterManagerTest.java
index e0c0c86..76caebc 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/meter/impl/MeterManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/meter/impl/MeterManagerTest.java
@@ -130,7 +130,7 @@
         m2 = DefaultMeter.builder()
                 .forDevice(did("2"))
                 .fromApp(APP_ID)
-                .withId(MeterId.meterId(2))
+                .withId(MeterId.meterId(1))
                 .withUnit(Meter.Unit.KB_PER_SEC)
                 .withBands(Collections.singletonList(band))
                 .build();
@@ -167,7 +167,7 @@
 
         assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
 
-        assertThat(manager.getMeter(MeterId.meterId(1)), is(m1));
+        assertThat(manager.getMeter(did("1"), MeterId.meterId(1)), is(m1));
     }
 
     @Test
@@ -175,7 +175,7 @@
         manager.submit(m1Request.add());
         manager.withdraw(m1Request.remove(), m1.id());
 
-        assertThat(manager.getMeter(MeterId.meterId(1)).state(),
+        assertThat(manager.getMeter(did("1"), MeterId.meterId(1)).state(),
                    is(MeterState.PENDING_REMOVE));
 
         providerService.pushMeterMetrics(m1.deviceId(), Collections.emptyList());
@@ -184,7 +184,16 @@
 
     }
 
+    @Test
+    public void testMultipleDevice() {
+        manager.submit(m1Request.add());
+        manager.submit(m2Request.add());
 
+        assertTrue("The meters were not added", manager.getAllMeters().size() == 2);
+
+        assertThat(manager.getMeter(did("1"), MeterId.meterId(1)), is(m1));
+        assertThat(manager.getMeter(did("2"), MeterId.meterId(1)), is(m2));
+    }
 
     public class TestApplicationId extends DefaultApplicationId {
         public TestApplicationId(int id, String name) {
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
index 32890cb..c09b1ac 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -33,6 +33,7 @@
 import org.onosproject.net.meter.MeterEvent;
 import org.onosproject.net.meter.MeterFailReason;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
 import org.onosproject.net.meter.MeterOperation;
 import org.onosproject.net.meter.MeterState;
 import org.onosproject.net.meter.MeterStore;
@@ -78,12 +79,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterService clusterService;
 
-    private ConsistentMap<MeterId, MeterData> meters;
+    private ConsistentMap<MeterKey, MeterData> meters;
     private NodeId local;
 
     private MapEventListener mapListener = new InternalMapEventListener();
 
-    private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
+    private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
             Maps.newConcurrentMap();
 
     @Activate
@@ -92,9 +93,10 @@
         local = clusterService.getLocalNode().id();
 
 
-        meters = storageService.<MeterId, MeterData>consistentMapBuilder()
+        meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
                     .withName(METERSTORE)
                     .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+                                                     MeterKey.class,
                                                      MeterData.class,
                                                      DefaultMeter.class,
                                                      DefaultBand.class,
@@ -120,11 +122,12 @@
     @Override
     public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
-        futures.put(meter.id(), future);
+        MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+        futures.put(key, future);
         MeterData data = new MeterData(meter, null, local);
 
         try {
-            meters.put(meter.id(), data);
+            meters.put(key, data);
         } catch (StorageException e) {
             future.completeExceptionally(e);
         }
@@ -136,14 +139,15 @@
     @Override
     public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
-        futures.put(meter.id(), future);
+        MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+        futures.put(key, future);
 
         MeterData data = new MeterData(meter, null, local);
 
         // update the state of the meter. It will be pruned by observing
         // that it has been removed from the dataplane.
         try {
-            if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
+            if (meters.computeIfPresent(key, (k, v) -> data) == null) {
                 future.complete(MeterStoreResult.success());
             }
         } catch (StorageException e) {
@@ -157,11 +161,12 @@
     @Override
     public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
-        futures.put(meter.id(), future);
+        MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+        futures.put(key, future);
 
         MeterData data = new MeterData(meter, null, local);
         try {
-            if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
+            if (meters.computeIfPresent(key, (k, v) -> data) == null) {
                 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
             }
         } catch (StorageException e) {
@@ -172,7 +177,8 @@
 
     @Override
     public void updateMeterState(Meter meter) {
-        meters.computeIfPresent(meter.id(), (id, v) -> {
+        MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+        meters.computeIfPresent(key, (k, v) -> {
             DefaultMeter m = (DefaultMeter) v.meter();
             m.setState(meter.state());
             m.setProcessedPackets(meter.packetsSeen());
@@ -185,8 +191,8 @@
     }
 
     @Override
-    public Meter getMeter(MeterId meterId) {
-        MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
+    public Meter getMeter(MeterKey key) {
+        MeterData data = Versioned.valueOrElse(meters.get(key), null);
         return data == null ? null : data.meter();
     }
 
@@ -198,14 +204,16 @@
 
     @Override
     public void failedMeter(MeterOperation op, MeterFailReason reason) {
-        meters.computeIfPresent(op.meter().id(), (k, v) ->
+        MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
+        meters.computeIfPresent(key, (k, v) ->
                 new MeterData(v.meter(), reason, v.origin()));
     }
 
     @Override
     public void deleteMeterNow(Meter m) {
-        futures.remove(m.id());
-        meters.remove(m.id());
+        MeterKey key = MeterKey.key(m.deviceId(), m.id());
+        futures.remove(key);
+        meters.remove(key);
     }
 
     private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {