[SDFAB-500][SDFAB-499] Implement user defined index mode for the meter service
- Introduce a boolean to control the meter service modes
- User defined mode does not provide any coordination to the apps
- Only one mode can be active at time
- In addition some sanity checks are peformed by the meter service
- Update existing unit tests and add new ones to test the new behaviors
- Initial clean up of the meters subsystems
Change-Id: I61500b794f27e94abd11637c84bce0dbb2e073f3
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 7f7873c..afc6d94 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -83,6 +83,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
@@ -115,7 +116,7 @@
// Meters id related objects
private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
// Available meter identifiers
- private ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
+ protected ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
// Atomic counter map for generation of new identifiers;
private static final String METERIDSTORE = "onos-meters-id-store";
private AtomicCounterMap<MeterTableKey> meterIdGenerators;
@@ -130,7 +131,12 @@
.register(Band.Type.class)
.register(MeterState.class)
.register(Meter.Unit.class)
- .register(MeterFailReason.class);
+ .register(MeterFailReason.class)
+ .register(MeterTableKey.class)
+ .register(MeterFeatures.class)
+ .register(DefaultMeterFeatures.class)
+ .register(MeterFeaturesFlag.class)
+ .register(MeterScope.class);
private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -143,6 +149,9 @@
private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
Maps.newConcurrentMap();
+ // Control the user defined index mode for the store.
+ protected boolean userDefinedIndexMode = false;
+
/**
* Defines possible selection strategies to reuse meter ids.
*/
@@ -170,16 +179,7 @@
metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
.withName(METERFEATURESSTORE)
.withTimestampProvider((key, features) -> new WallClockTimestamp())
- .withSerializer(KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(MeterTableKey.class)
- .register(MeterFeatures.class)
- .register(DefaultMeterFeatures.class)
- .register(DefaultBand.class)
- .register(Band.Type.class)
- .register(Meter.Unit.class)
- .register(MeterFailReason.class)
- .register(MeterFeaturesFlag.class)).build();
+ .withSerializer(APP_KRYO_BUILDER).build();
metersFeatures.addListener(featuresMapListener);
// Init the map of the available ids set
// Set will be created when a new Meter Features is pushed to the store
@@ -190,6 +190,7 @@
.withSerializer(Serializer.using(KryoNamespaces.API,
MeterTableKey.class,
MeterScope.class)).build();
+
log.info("Started");
}
@@ -199,15 +200,15 @@
metersFeatures.removeListener(featuresMapListener);
meters.destroy();
metersFeatures.destroy();
- availableMeterIds.forEach((key, set) -> {
- set.destroy();
- });
+ availableMeterIds.forEach((key, set) -> set.destroy());
+
log.info("Stopped");
}
@Override
public CompletableFuture<MeterStoreResult> addOrUpdateMeter(Meter meter) {
- // Init steps
+ // Verify integrity of the index
+ checkArgument(validIndex(meter), "Meter index is not valid");
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
MeterData data = new MeterData(meter, null);
@@ -227,23 +228,7 @@
@Override
public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
- // Init steps
- CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
- MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
- // Store the future related to the operation
- futures.put(key, future);
- // Store the meter data
- MeterData data = new MeterData(meter, null);
- try {
- meters.put(key, data);
- } catch (StorageException e) {
- log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
- e.getMessage(), e);
- futures.remove(key);
- future.completeExceptionally(e);
- }
- // Done, return the future
- return future;
+ return addOrUpdateMeter(meter);
}
@Override
@@ -309,9 +294,7 @@
Set<MeterTableKey> keys = metersFeatures.keySet().stream()
.filter(key -> key.deviceId().equals(deviceId))
.collect(Collectors.toUnmodifiableSet());
- keys.forEach(k -> {
- metersFeatures.remove(k);
- });
+ keys.forEach(k -> metersFeatures.remove(k));
} catch (StorageException e) {
log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
e.getMessage(), e);
@@ -342,27 +325,6 @@
}
@Override
- // TODO Should we remove it ? We are not using it
- public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
- CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
- MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
- futures.put(key, future);
-
- MeterData data = new MeterData(meter, null);
- try {
- if (meters.computeIfPresent(key, (k, v) -> data) == null) {
- future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
- }
- } catch (StorageException e) {
- log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
- e.getMessage(), e);
- futures.remove(key);
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
public Meter updateMeterState(Meter meter) {
// Update meter if present (stats workflow)
MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
@@ -405,7 +367,7 @@
@Override
public void failedMeter(MeterOperation op, MeterFailReason reason) {
// Meter ops failed (got notification from the sb)
- MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
+ MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().meterCellId());
meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
}
@@ -419,7 +381,7 @@
public void purgeMeter(Meter m) {
// Once we receive the ack from the sb
// create the key and remove definitely the meter
- MeterKey key = MeterKey.key(m.deviceId(), m.id());
+ MeterKey key = MeterKey.key(m.deviceId(), m.meterCellId());
try {
if (Versioned.valueOrNull(meters.remove(key)) != null) {
// Free the id
@@ -441,6 +403,12 @@
@Override
public void purgeMeter(DeviceId deviceId) {
+ // This method is renamed in onos-2.5
+ purgeMeters(deviceId);
+ }
+
+ @Override
+ public void purgeMeters(DeviceId deviceId) {
List<Versioned<MeterData>> metersPendingRemove = meters.stream()
.filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
.map(Map.Entry::getValue)
@@ -456,7 +424,19 @@
e.getValue().value().meter().appId().equals(appId))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
- metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+ metersPendingRemove.forEach(versionedMeterKey
+ -> purgeMeter(versionedMeterKey.value().meter()));
+ }
+
+ @Override
+ public boolean userDefinedIndexMode(boolean enable) {
+ if (meters.isEmpty() && meterIdGenerators.isEmpty()) {
+ userDefinedIndexMode = enable;
+ } else {
+ log.warn("Unable to {} user defined index mode as store did" +
+ "already some allocations", enable ? "activate" : "deactivate");
+ }
+ return userDefinedIndexMode;
}
@Override
@@ -473,6 +453,28 @@
return features == null ? 0L : features.maxMeter();
}
+ private boolean validIndex(Meter meter) {
+ long index;
+ MeterTableKey key;
+
+ if (meter.meterCellId().type() == PIPELINE_INDEPENDENT) {
+ PiMeterCellId piMeterCellId = (PiMeterCellId) meter.meterCellId();
+ index = piMeterCellId.index();
+ key = MeterTableKey.key(meter.deviceId(), MeterScope.of(piMeterCellId.meterId().id()));
+ } else if (meter.meterCellId().type() == INDEX) {
+ MeterId meterId = (MeterId) meter.meterCellId();
+ index = meterId.id();
+ key = MeterTableKey.key(meter.deviceId(), MeterScope.globalScope());
+ } else {
+ return false;
+ }
+
+ MeterFeatures features = metersFeatures.get(key);
+ long startIndex = features == null ? -1L : features.startIndex();
+ long endIndex = features == null ? -1L : features.endIndex();
+ return index >= startIndex && index <= endIndex;
+ }
+
private long getStartIndex(MeterTableKey key) {
// Leverage the meter features to know the start id
// Since we are using index now
@@ -556,7 +558,7 @@
Set<MeterCellId> localAvailableMeterIds = keySet.stream()
.filter(meterKey ->
meterKey.deviceId().equals(meterTableKey.deviceId()))
- .map(MeterKey::meterId)
+ .map(MeterKey::meterCellId)
.collect(Collectors.toSet());
// Get next available id
MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
@@ -584,6 +586,10 @@
@Override
public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
+ if (userDefinedIndexMode) {
+ log.warn("Unable to allocate meter id when user defined index mode is enabled");
+ return null;
+ }
MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
MeterCellId meterCellId;
long id;
@@ -638,6 +644,10 @@
}
private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
+ if (userDefinedIndexMode) {
+ log.warn("Unable to free meter id when user defined index mode is enabled");
+ return;
+ }
long index;
if (meterCellId.type() == PIPELINE_INDEPENDENT) {
PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;