adding device specific counters for meter ids in
the meter service.
Change-Id: I38d38a0a85024927f5a74013b2b4d9efa9b32d22
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
index 575a715..8c86e01 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.incubator.net.meter.impl;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -27,6 +28,7 @@
import org.onosproject.net.meter.MeterEvent;
import org.onosproject.net.meter.MeterFailReason;
import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
import org.onosproject.net.meter.MeterListener;
import org.onosproject.net.meter.MeterOperation;
import org.onosproject.net.meter.MeterProvider;
@@ -61,7 +63,7 @@
MeterProvider, MeterProviderService>
implements MeterService, MeterProviderRegistry {
- private final String meterIdentifier = "meter-id-counter";
+ private static final String METERCOUNTERIDENTIFIER = "meter-id-counter-%s";
private final Logger log = getLogger(getClass());
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
@@ -71,15 +73,16 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MeterStore store;
- private AtomicCounter meterIdCounter;
+ private Map<DeviceId, AtomicCounter> meterIdCounters
+ = Maps.newConcurrentMap();
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
@Activate
public void activate() {
- meterIdCounter = storageService.atomicCounterBuilder()
- .withName(meterIdentifier)
- .build();
+ /*meterIdCounter = storageService.atomicCounterBuilder()
+ .withName(METERCOUNTERIDENTIFIER)
+ .build();*/
store.setDelegate(delegate);
@@ -115,11 +118,13 @@
@Override
public Meter submit(MeterRequest request) {
+ MeterId id = allocateMeterId(request.deviceId());
+
Meter.Builder mBuilder = DefaultMeter.builder()
.forDevice(request.deviceId())
.fromApp(request.appId())
.withBands(request.bands())
- .withId(allocateMeterId())
+ .withId(id)
.withUnit(request.unit());
if (request.isBurst()) {
@@ -152,8 +157,9 @@
}
@Override
- public Meter getMeter(MeterId id) {
- return store.getMeter(id);
+ public Meter getMeter(DeviceId deviceId, MeterId id) {
+ MeterKey key = MeterKey.key(deviceId, id);
+ return store.getMeter(key);
}
@Override
@@ -161,9 +167,21 @@
return store.getAllMeters();
}
- private MeterId allocateMeterId() {
- // FIXME: This will break one day.
- return MeterId.meterId((int) meterIdCounter.incrementAndGet());
+ private MeterId allocateMeterId(DeviceId deviceId) {
+ long id = meterIdCounters.compute(deviceId, (k, v) -> {
+ if (v == null) {
+ return allocateCounter(k);
+ }
+ return v;
+ }).incrementAndGet();
+
+ return MeterId.meterId((int) id);
+ }
+
+ private AtomicCounter allocateCounter(DeviceId deviceId) {
+ return storageService.atomicCounterBuilder()
+ .withName(String.format(METERCOUNTERIDENTIFIER, deviceId))
+ .build();
}
private class InternalMeterProviderService
diff --git a/incubator/net/src/test/java/org/onosproject/incubator/net/meter/impl/MeterManagerTest.java b/incubator/net/src/test/java/org/onosproject/incubator/net/meter/impl/MeterManagerTest.java
index e0c0c86..76caebc 100644
--- a/incubator/net/src/test/java/org/onosproject/incubator/net/meter/impl/MeterManagerTest.java
+++ b/incubator/net/src/test/java/org/onosproject/incubator/net/meter/impl/MeterManagerTest.java
@@ -130,7 +130,7 @@
m2 = DefaultMeter.builder()
.forDevice(did("2"))
.fromApp(APP_ID)
- .withId(MeterId.meterId(2))
+ .withId(MeterId.meterId(1))
.withUnit(Meter.Unit.KB_PER_SEC)
.withBands(Collections.singletonList(band))
.build();
@@ -167,7 +167,7 @@
assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
- assertThat(manager.getMeter(MeterId.meterId(1)), is(m1));
+ assertThat(manager.getMeter(did("1"), MeterId.meterId(1)), is(m1));
}
@Test
@@ -175,7 +175,7 @@
manager.submit(m1Request.add());
manager.withdraw(m1Request.remove(), m1.id());
- assertThat(manager.getMeter(MeterId.meterId(1)).state(),
+ assertThat(manager.getMeter(did("1"), MeterId.meterId(1)).state(),
is(MeterState.PENDING_REMOVE));
providerService.pushMeterMetrics(m1.deviceId(), Collections.emptyList());
@@ -184,7 +184,16 @@
}
+ @Test
+ public void testMultipleDevice() {
+ manager.submit(m1Request.add());
+ manager.submit(m2Request.add());
+ assertTrue("The meters were not added", manager.getAllMeters().size() == 2);
+
+ assertThat(manager.getMeter(did("1"), MeterId.meterId(1)), is(m1));
+ assertThat(manager.getMeter(did("2"), MeterId.meterId(1)), is(m2));
+ }
public class TestApplicationId extends DefaultApplicationId {
public TestApplicationId(int id, String name) {
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
index 32890cb..c09b1ac 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -33,6 +33,7 @@
import org.onosproject.net.meter.MeterEvent;
import org.onosproject.net.meter.MeterFailReason;
import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
import org.onosproject.net.meter.MeterOperation;
import org.onosproject.net.meter.MeterState;
import org.onosproject.net.meter.MeterStore;
@@ -78,12 +79,12 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
- private ConsistentMap<MeterId, MeterData> meters;
+ private ConsistentMap<MeterKey, MeterData> meters;
private NodeId local;
private MapEventListener mapListener = new InternalMapEventListener();
- private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
+ private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
Maps.newConcurrentMap();
@Activate
@@ -92,9 +93,10 @@
local = clusterService.getLocalNode().id();
- meters = storageService.<MeterId, MeterData>consistentMapBuilder()
+ meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
.withName(METERSTORE)
.withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+ MeterKey.class,
MeterData.class,
DefaultMeter.class,
DefaultBand.class,
@@ -120,11 +122,12 @@
@Override
public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
- futures.put(meter.id(), future);
+ MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ futures.put(key, future);
MeterData data = new MeterData(meter, null, local);
try {
- meters.put(meter.id(), data);
+ meters.put(key, data);
} catch (StorageException e) {
future.completeExceptionally(e);
}
@@ -136,14 +139,15 @@
@Override
public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
- futures.put(meter.id(), future);
+ MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ futures.put(key, future);
MeterData data = new MeterData(meter, null, local);
// update the state of the meter. It will be pruned by observing
// that it has been removed from the dataplane.
try {
- if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
+ if (meters.computeIfPresent(key, (k, v) -> data) == null) {
future.complete(MeterStoreResult.success());
}
} catch (StorageException e) {
@@ -157,11 +161,12 @@
@Override
public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
- futures.put(meter.id(), future);
+ MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ futures.put(key, future);
MeterData data = new MeterData(meter, null, local);
try {
- if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
+ if (meters.computeIfPresent(key, (k, v) -> data) == null) {
future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
}
} catch (StorageException e) {
@@ -172,7 +177,8 @@
@Override
public void updateMeterState(Meter meter) {
- meters.computeIfPresent(meter.id(), (id, v) -> {
+ MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ meters.computeIfPresent(key, (k, v) -> {
DefaultMeter m = (DefaultMeter) v.meter();
m.setState(meter.state());
m.setProcessedPackets(meter.packetsSeen());
@@ -185,8 +191,8 @@
}
@Override
- public Meter getMeter(MeterId meterId) {
- MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
+ public Meter getMeter(MeterKey key) {
+ MeterData data = Versioned.valueOrElse(meters.get(key), null);
return data == null ? null : data.meter();
}
@@ -198,14 +204,16 @@
@Override
public void failedMeter(MeterOperation op, MeterFailReason reason) {
- meters.computeIfPresent(op.meter().id(), (k, v) ->
+ MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
+ meters.computeIfPresent(key, (k, v) ->
new MeterData(v.meter(), reason, v.origin()));
}
@Override
public void deleteMeterNow(Meter m) {
- futures.remove(m.id());
- meters.remove(m.id());
+ MeterKey key = MeterKey.key(m.deviceId(), m.id());
+ futures.remove(key);
+ meters.remove(key);
}
private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {