[SDFAB-500][SDFAB-499] Implement user defined index mode for the meter service
- Introduce a boolean to control the meter service modes
- User defined mode does not provide any coordination to the apps
- Only one mode can be active at time
- In addition some sanity checks are peformed by the meter service
- Update existing unit tests and add new ones to test the new behaviors
- Initial clean up of the meters subsystems
Change-Id: I61500b794f27e94abd11637c84bce0dbb2e073f3
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 7f7873c..afc6d94 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -83,6 +83,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
@@ -115,7 +116,7 @@
// Meters id related objects
private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
// Available meter identifiers
- private ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
+ protected ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
// Atomic counter map for generation of new identifiers;
private static final String METERIDSTORE = "onos-meters-id-store";
private AtomicCounterMap<MeterTableKey> meterIdGenerators;
@@ -130,7 +131,12 @@
.register(Band.Type.class)
.register(MeterState.class)
.register(Meter.Unit.class)
- .register(MeterFailReason.class);
+ .register(MeterFailReason.class)
+ .register(MeterTableKey.class)
+ .register(MeterFeatures.class)
+ .register(DefaultMeterFeatures.class)
+ .register(MeterFeaturesFlag.class)
+ .register(MeterScope.class);
private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -143,6 +149,9 @@
private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
Maps.newConcurrentMap();
+ // Control the user defined index mode for the store.
+ protected boolean userDefinedIndexMode = false;
+
/**
* Defines possible selection strategies to reuse meter ids.
*/
@@ -170,16 +179,7 @@
metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
.withName(METERFEATURESSTORE)
.withTimestampProvider((key, features) -> new WallClockTimestamp())
- .withSerializer(KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(MeterTableKey.class)
- .register(MeterFeatures.class)
- .register(DefaultMeterFeatures.class)
- .register(DefaultBand.class)
- .register(Band.Type.class)
- .register(Meter.Unit.class)
- .register(MeterFailReason.class)
- .register(MeterFeaturesFlag.class)).build();
+ .withSerializer(APP_KRYO_BUILDER).build();
metersFeatures.addListener(featuresMapListener);
// Init the map of the available ids set
// Set will be created when a new Meter Features is pushed to the store
@@ -190,6 +190,7 @@
.withSerializer(Serializer.using(KryoNamespaces.API,
MeterTableKey.class,
MeterScope.class)).build();
+
log.info("Started");
}
@@ -199,15 +200,15 @@
metersFeatures.removeListener(featuresMapListener);
meters.destroy();
metersFeatures.destroy();
- availableMeterIds.forEach((key, set) -> {
- set.destroy();
- });
+ availableMeterIds.forEach((key, set) -> set.destroy());
+
log.info("Stopped");
}
@Override
public CompletableFuture<MeterStoreResult> addOrUpdateMeter(Meter meter) {
- // Init steps
+ // Verify integrity of the index
+ checkArgument(validIndex(meter), "Meter index is not valid");
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
MeterData data = new MeterData(meter, null);
@@ -227,23 +228,7 @@
@Override
public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
- // Init steps
- CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
- MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
- // Store the future related to the operation
- futures.put(key, future);
- // Store the meter data
- MeterData data = new MeterData(meter, null);
- try {
- meters.put(key, data);
- } catch (StorageException e) {
- log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
- e.getMessage(), e);
- futures.remove(key);
- future.completeExceptionally(e);
- }
- // Done, return the future
- return future;
+ return addOrUpdateMeter(meter);
}
@Override
@@ -309,9 +294,7 @@
Set<MeterTableKey> keys = metersFeatures.keySet().stream()
.filter(key -> key.deviceId().equals(deviceId))
.collect(Collectors.toUnmodifiableSet());
- keys.forEach(k -> {
- metersFeatures.remove(k);
- });
+ keys.forEach(k -> metersFeatures.remove(k));
} catch (StorageException e) {
log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
e.getMessage(), e);
@@ -342,27 +325,6 @@
}
@Override
- // TODO Should we remove it ? We are not using it
- public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
- CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
- MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
- futures.put(key, future);
-
- MeterData data = new MeterData(meter, null);
- try {
- if (meters.computeIfPresent(key, (k, v) -> data) == null) {
- future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
- }
- } catch (StorageException e) {
- log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
- e.getMessage(), e);
- futures.remove(key);
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
public Meter updateMeterState(Meter meter) {
// Update meter if present (stats workflow)
MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
@@ -405,7 +367,7 @@
@Override
public void failedMeter(MeterOperation op, MeterFailReason reason) {
// Meter ops failed (got notification from the sb)
- MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
+ MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().meterCellId());
meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
}
@@ -419,7 +381,7 @@
public void purgeMeter(Meter m) {
// Once we receive the ack from the sb
// create the key and remove definitely the meter
- MeterKey key = MeterKey.key(m.deviceId(), m.id());
+ MeterKey key = MeterKey.key(m.deviceId(), m.meterCellId());
try {
if (Versioned.valueOrNull(meters.remove(key)) != null) {
// Free the id
@@ -441,6 +403,12 @@
@Override
public void purgeMeter(DeviceId deviceId) {
+ // This method is renamed in onos-2.5
+ purgeMeters(deviceId);
+ }
+
+ @Override
+ public void purgeMeters(DeviceId deviceId) {
List<Versioned<MeterData>> metersPendingRemove = meters.stream()
.filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
.map(Map.Entry::getValue)
@@ -456,7 +424,19 @@
e.getValue().value().meter().appId().equals(appId))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
- metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+ metersPendingRemove.forEach(versionedMeterKey
+ -> purgeMeter(versionedMeterKey.value().meter()));
+ }
+
+ @Override
+ public boolean userDefinedIndexMode(boolean enable) {
+ if (meters.isEmpty() && meterIdGenerators.isEmpty()) {
+ userDefinedIndexMode = enable;
+ } else {
+ log.warn("Unable to {} user defined index mode as store did" +
+ "already some allocations", enable ? "activate" : "deactivate");
+ }
+ return userDefinedIndexMode;
}
@Override
@@ -473,6 +453,28 @@
return features == null ? 0L : features.maxMeter();
}
+ private boolean validIndex(Meter meter) {
+ long index;
+ MeterTableKey key;
+
+ if (meter.meterCellId().type() == PIPELINE_INDEPENDENT) {
+ PiMeterCellId piMeterCellId = (PiMeterCellId) meter.meterCellId();
+ index = piMeterCellId.index();
+ key = MeterTableKey.key(meter.deviceId(), MeterScope.of(piMeterCellId.meterId().id()));
+ } else if (meter.meterCellId().type() == INDEX) {
+ MeterId meterId = (MeterId) meter.meterCellId();
+ index = meterId.id();
+ key = MeterTableKey.key(meter.deviceId(), MeterScope.globalScope());
+ } else {
+ return false;
+ }
+
+ MeterFeatures features = metersFeatures.get(key);
+ long startIndex = features == null ? -1L : features.startIndex();
+ long endIndex = features == null ? -1L : features.endIndex();
+ return index >= startIndex && index <= endIndex;
+ }
+
private long getStartIndex(MeterTableKey key) {
// Leverage the meter features to know the start id
// Since we are using index now
@@ -556,7 +558,7 @@
Set<MeterCellId> localAvailableMeterIds = keySet.stream()
.filter(meterKey ->
meterKey.deviceId().equals(meterTableKey.deviceId()))
- .map(MeterKey::meterId)
+ .map(MeterKey::meterCellId)
.collect(Collectors.toSet());
// Get next available id
MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
@@ -584,6 +586,10 @@
@Override
public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
+ if (userDefinedIndexMode) {
+ log.warn("Unable to allocate meter id when user defined index mode is enabled");
+ return null;
+ }
MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
MeterCellId meterCellId;
long id;
@@ -638,6 +644,10 @@
}
private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
+ if (userDefinedIndexMode) {
+ log.warn("Unable to free meter id when user defined index mode is enabled");
+ return;
+ }
long index;
if (meterCellId.type() == PIPELINE_INDEPENDENT) {
PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
diff --git a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
index 0527755..37ff7b4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
@@ -21,10 +21,8 @@
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
-import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.TestApplicationId;
-import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.MeterQuery;
import org.onosproject.net.driver.Behaviour;
@@ -37,11 +35,16 @@
import org.onosproject.net.meter.DefaultMeter;
import org.onosproject.net.meter.DefaultMeterFeatures;
import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterCellId;
import org.onosproject.net.meter.MeterFeatures;
import org.onosproject.net.meter.MeterFeaturesKey;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterKey;
+import org.onosproject.net.meter.MeterScope;
import org.onosproject.net.meter.MeterState;
+import org.onosproject.net.meter.MeterTableKey;
+import org.onosproject.net.pi.model.PiMeterId;
+import org.onosproject.net.pi.runtime.PiMeterCellId;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TestStorageService;
@@ -61,13 +64,6 @@
* Meter store tests.
*/
public class DistributedMeterStoreTest {
-
- // Test node id
- private static final NodeId NID_LOCAL = new NodeId("local");
-
- // Test ip address
- private static final IpAddress LOCALHOST = IpAddress.valueOf("127.0.0.1");
-
// Store under testing
private DistributedMeterStore meterStore;
@@ -82,6 +78,10 @@
private MeterId mid2 = MeterId.meterId(2);
private MeterId mid3 = MeterId.meterId(3);
private MeterId mid10 = MeterId.meterId(10);
+ private MeterCellId cid4 = PiMeterCellId.ofIndirect(
+ PiMeterId.of("foo"), 4);
+ private MeterCellId invalidCid = PiMeterCellId.ofIndirect(
+ PiMeterId.of("foo"), 11);
// Bands used during the tests
private Band b1 = DefaultBand.builder()
@@ -114,6 +114,14 @@
.withBands(Collections.singletonList(b1))
.build();
+ private Meter m4 = DefaultMeter.builder()
+ .forDevice(did3)
+ .fromApp(APP_ID)
+ .withCellId(cid4)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+
// Meter features used during the tests
private MeterFeatures mef1 = DefaultMeterFeatures.builder().forDevice(did1)
.withMaxMeters(3L)
@@ -133,6 +141,17 @@
.withMaxBands((byte) 0)
.withMaxColors((byte) 0)
.build();
+ private MeterFeatures mef3 = DefaultMeterFeatures.builder().forDevice(did3)
+ .withStartIndex(0L)
+ .withEndIndex(10L)
+ .withScope(MeterScope.of("foo"))
+ .withBandTypes(new HashSet<>())
+ .withUnits(new HashSet<>())
+ .hasStats(false)
+ .hasBurst(false)
+ .withMaxBands((byte) 0)
+ .withMaxColors((byte) 0)
+ .build();
@Before
public void setup() {
@@ -158,11 +177,13 @@
meterStore.deactivate();
}
- private void initMeterStore() {
+ private void initMeterStore(boolean enableUserDefinedIndex) {
+ meterStore.userDefinedIndexMode(enableUserDefinedIndex);
// Let's store feature for device 1
meterStore.storeMeterFeatures(mef1);
// Let's store feature for device 2
meterStore.storeMeterFeatures(mef2);
+ meterStore.storeMeterFeatures(mef3);
}
/**
@@ -201,7 +222,7 @@
@Test
public void testAllocateId() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Allocate a meter id and verify is equal to mid1
assertThat(mid1, is(meterStore.allocateMeterId(did1)));
// Allocate a meter id and verify is equal to mid2
@@ -214,7 +235,7 @@
@Test
public void testFreeId() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Allocate a meter id and verify is equal to mid1
assertThat(mid1, is(meterStore.allocateMeterId(did1)));
// Free the above id
@@ -233,7 +254,7 @@
@Test
public void testReuseId() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Reserve id 1
MeterId meterIdOne = meterStore.allocateMeterId(did2);
// Free the above id
@@ -291,7 +312,7 @@
@Test
public void testQueryMeters() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Let's test queryMeters
assertThat(mid1, is(meterStore.allocateMeterId(did3)));
// Let's test queryMeters error
@@ -304,7 +325,7 @@
@Test
public void testMaxMeterError() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Reserve id 1
assertThat(mid1, is(meterStore.allocateMeterId(did1)));
// Reserve id 2
@@ -319,7 +340,7 @@
@Test
public void testStoreMeter() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Simulate the allocation of an id
MeterId idOne = meterStore.allocateMeterId(did1);
// Verify the allocation
@@ -350,7 +371,7 @@
@Test
public void testDeleteMeter() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Simulate the allocation of an id
MeterId idOne = meterStore.allocateMeterId(did1);
// Verify the allocation
@@ -396,7 +417,7 @@
@Test
public void testNoDeleteMeter() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Simulate the allocation of an id
MeterId idOne = meterStore.allocateMeterId(did1);
// Create the key
@@ -440,7 +461,7 @@
@Test
public void testPurgeMeterDeviceAndApp() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// add the meters
((DefaultMeter) m1).setState(MeterState.PENDING_ADD);
((DefaultMeter) m2).setState(MeterState.PENDING_ADD);
@@ -465,7 +486,7 @@
@Test
public void testGetMetersImmutability() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Simulate the allocation of an id
MeterId idOne = meterStore.allocateMeterId(did1);
@@ -513,8 +534,144 @@
metersDevice = meterStore.getAllMeters(did1);
assertThat(2, is(meters.size()));
assertThat(2, is(metersDevice.size()));
+ }
+ /**
+ * Test invalid allocation of a cell id.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidCellId() {
+ initMeterStore(true);
+ // MF defines an end index equals to 10
+ Meter meterBad = DefaultMeter.builder()
+ .forDevice(did3)
+ .fromApp(APP_ID)
+ .withCellId(invalidCid)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ ((DefaultMeter) meterBad).setState(MeterState.PENDING_ADD);
+ meterStore.storeMeter(meterBad);
+ }
+ /**
+ * Test enabling user defined index mode.
+ */
+ @Test
+ public void testEnableUserDefinedIndex() {
+ initMeterStore(false);
+ assertTrue(meterStore.userDefinedIndexMode(true));
+ }
+
+ /**
+ * Test invalid enabling user defined index mode.
+ */
+ @Test
+ public void testInvalidEnableUserDefinedIndex() {
+ testStoreMeter();
+ assertFalse(meterStore.userDefinedIndexMode(true));
+ }
+
+ /**
+ * Test disabling user defined index mode.
+ */
+ @Test
+ public void testDisableUserDefinedIndex() {
+ initMeterStore(true);
+ assertFalse(meterStore.userDefinedIndexMode(false));
+ }
+
+ /**
+ * Test store meter in user defined index mode.
+ */
+ @Test
+ public void testStoreMeterInUserDefinedIndexMode() {
+ initMeterStore(true);
+ // Let's create a meter
+ Meter meterOne = DefaultMeter.builder()
+ .forDevice(did3)
+ .fromApp(APP_ID)
+ .withCellId(cid4)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ // Set the state
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_ADD);
+ // Store the meter
+ meterStore.storeMeter(meterOne);
+ // Let's create meter key
+ MeterKey meterKey = MeterKey.key(did3, cid4);
+ // Verify the store
+ assertThat(1, is(meterStore.getAllMeters().size()));
+ assertThat(1, is(meterStore.getAllMeters(did3).size()));
+ assertThat(m4, is(meterStore.getMeter(meterKey)));
+ }
+
+ /**
+ * Test invalid disabling user defined index mode.
+ */
+ @Test
+ public void testInvalidDisableUserDefinedIndex() {
+ testStoreMeterInUserDefinedIndexMode();
+ assertTrue(meterStore.userDefinedIndexMode(false));
+ }
+
+ /**
+ * Test allocation of meter ids in user defined index mode.
+ */
+ @Test
+ public void testAllocateIdInUserDefinedIndexMode() {
+ initMeterStore(true);
+ assertNull(meterStore.allocateMeterId(did1));
+ }
+
+ /**
+ * Test free of meter ids in user defined index mode.
+ */
+ @Test
+ public void testFreeIdInUserMode() {
+ initMeterStore(true);
+ // Free the id and expect not being available
+ meterStore.freeMeterId(did1, mid1);
+ MeterTableKey globalKey = MeterTableKey.key(did1, MeterScope.globalScope());
+ assertNotNull(meterStore.availableMeterIds.get(globalKey));
+ assertTrue(meterStore.availableMeterIds.get(globalKey).isEmpty());
+ }
+
+ /**
+ * Test delete meter in user defined index mode.
+ */
+ @Test
+ public void testDeleteMeterInUserDefinedIndexMode() {
+ initMeterStore(true);
+ Meter meterOne = DefaultMeter.builder()
+ .forDevice(did3)
+ .fromApp(APP_ID)
+ .withCellId(cid4)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_ADD);
+ meterStore.storeMeter(meterOne);
+
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_REMOVE);
+ MeterKey meterKey = MeterKey.key(did3, cid4);
+ meterStore.deleteMeter(meterOne);
+ CompletableFuture<Void> future = CompletableFuture.runAsync(
+ () -> meterStore.purgeMeter(meterOne)
+ );
+
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ assertThat(0, is(meterStore.getAllMeters().size()));
+ assertThat(0, is(meterStore.getAllMeters(did3).size()));
+ assertNull(meterStore.getMeter(meterKey));
+ MeterTableKey globalKey = MeterTableKey.key(did1, MeterScope.globalScope());
+ assertNotNull(meterStore.availableMeterIds.get(globalKey));
+ assertTrue(meterStore.availableMeterIds.get(globalKey).isEmpty());
}
// Test class for driver service.