[SDFAB-352][SDFAB-353] Retrieve MeterFeatures from the P4RT southbound, Extend MeterProviderService and revisit MeterStore
Change-Id: If0dae53643988cb551ff5020abd792cb6d33ff6b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 11570f7..2fdba6f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -31,6 +31,7 @@
import org.onosproject.net.meter.DefaultMeter;
import org.onosproject.net.meter.DefaultMeterFeatures;
import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterCellId;
import org.onosproject.net.meter.MeterEvent;
import org.onosproject.net.meter.MeterFailReason;
import org.onosproject.net.meter.MeterFeatures;
@@ -39,10 +40,14 @@
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterKey;
import org.onosproject.net.meter.MeterOperation;
+import org.onosproject.net.meter.MeterScope;
import org.onosproject.net.meter.MeterState;
import org.onosproject.net.meter.MeterStore;
import org.onosproject.net.meter.MeterStoreDelegate;
import org.onosproject.net.meter.MeterStoreResult;
+import org.onosproject.net.meter.MeterTableKey;
+import org.onosproject.net.pi.model.PiMeterId;
+import org.onosproject.net.pi.runtime.PiMeterCellId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.primitives.DefaultDistributedSet;
import org.onosproject.store.serializers.KryoNamespaces;
@@ -50,12 +55,16 @@
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -64,6 +73,8 @@
import org.slf4j.Logger;
import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -73,6 +84,8 @@
import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
+import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
+import static org.onosproject.net.meter.MeterCellId.MeterCellType.PIPELINE_INDEPENDENT;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -88,20 +101,22 @@
// Meters map related objects
private static final String METERSTORE = "onos-meter-store";
private ConsistentMap<MeterKey, MeterData> meters;
- private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
+ private MapEventListener<MeterKey, MeterData> metersMapListener = new InternalMetersMapEventListener();
private Map<MeterKey, MeterData> metersMap;
// Meters features related objects
private static final String METERFEATURESSTORE = "onos-meter-features-store";
- private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
+ private EventuallyConsistentMap<MeterTableKey, MeterFeatures> metersFeatures;
+ private EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> featuresMapListener =
+ new InternalFeaturesMapEventListener();
// Meters id related objects
private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
// Available meter identifiers
- private DistributedSet<MeterKey> availableMeterIds;
+ private ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
// Atomic counter map for generation of new identifiers;
private static final String METERIDSTORE = "onos-meters-id-store";
- private AtomicCounterMap<DeviceId> meterIdGenerators;
+ private AtomicCounterMap<MeterTableKey> meterIdGenerators;
// Serializer related objects
private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
@@ -147,35 +162,44 @@
meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
.withName(METERSTORE)
.withSerializer(serializer).build();
- meters.addListener(mapListener);
- // Init meter features map (meaningful only for OpenFlow protocol)
- meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
- .withName(METERFEATURESSTORE)
- .withSerializer(Serializer.using(KryoNamespaces.API,
- MeterFeaturesKey.class,
- MeterFeatures.class,
- DefaultMeterFeatures.class,
- Band.Type.class,
- Meter.Unit.class,
- MeterFailReason.class,
- MeterFeaturesFlag.class)).build();
+ meters.addListener(metersMapListener);
metersMap = meters.asJavaMap();
- // Init the set of the available ids
- availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
- .withName(AVAILABLEMETERIDSTORE)
- .withSerializer(Serializer.using(KryoNamespaces.API,
- MeterKey.class)).build(),
- DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
+ // Init meter features map
+ metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
+ .withName(METERFEATURESSTORE)
+ .withTimestampProvider((key, features) -> new WallClockTimestamp())
+ .withSerializer(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(MeterTableKey.class)
+ .register(MeterFeatures.class)
+ .register(DefaultMeterFeatures.class)
+ .register(DefaultBand.class)
+ .register(Band.Type.class)
+ .register(Meter.Unit.class)
+ .register(MeterFailReason.class)
+ .register(MeterFeaturesFlag.class)).build();
+ metersFeatures.addListener(featuresMapListener);
+ // Init the map of the available ids set
+ // Set will be created when a new Meter Features is pushed to the store
+ availableMeterIds = new ConcurrentHashMap<>();
// Init atomic map counters
- meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
+ meterIdGenerators = storageService.<MeterTableKey>atomicCounterMapBuilder()
.withName(METERIDSTORE)
- .withSerializer(Serializer.using(KryoNamespaces.API)).build();
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ MeterTableKey.class,
+ MeterScope.class)).build();
log.info("Started");
}
@Deactivate
public void deactivate() {
- meters.removeListener(mapListener);
+ meters.removeListener(metersMapListener);
+ metersFeatures.removeListener(featuresMapListener);
+ meters.destroy();
+ metersFeatures.destroy();
+ availableMeterIds.forEach((key, set) -> {
+ set.destroy();
+ });
log.info("Stopped");
}
@@ -229,11 +253,11 @@
@Override
public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
- // Store meter features, this is done once for each device
+ // Store meter features, this is done once for each features of every device
MeterStoreResult result = MeterStoreResult.success();
- MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
+ MeterTableKey key = MeterTableKey.key(meterfeatures.deviceId(), meterfeatures.scope());
try {
- meterFeatures.putIfAbsent(key, meterfeatures);
+ metersFeatures.put(key, meterfeatures);
} catch (StorageException e) {
log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
e.getMessage(), e);
@@ -244,16 +268,20 @@
@Override
public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
- // Remove meter features - these ops are meaningful only for OpenFlow
MeterStoreResult result = MeterStoreResult.success();
- MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
try {
- meterFeatures.remove(key);
+ Set<MeterTableKey> keys = metersFeatures.keySet().stream()
+ .filter(key -> key.deviceId().equals(deviceId))
+ .collect(Collectors.toUnmodifiableSet());
+ keys.forEach(k -> {
+ metersFeatures.remove(k);
+ });
} catch (StorageException e) {
log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
- e.getMessage(), e);
+ e.getMessage(), e);
result = MeterStoreResult.fail(TIMEOUT);
}
+
return result;
}
@@ -327,13 +355,27 @@
@Override
public void deleteMeterNow(Meter m) {
+ // This method is renamed in onos-2.5
+ purgeMeter(m);
+ }
+
+ @Override
+ public void purgeMeter(Meter m) {
// Once we receive the ack from the sb
// create the key and remove definitely the meter
MeterKey key = MeterKey.key(m.deviceId(), m.id());
try {
if (Versioned.valueOrNull(meters.remove(key)) != null) {
// Free the id
- freeMeterId(m.deviceId(), m.id());
+ MeterScope scope;
+ if (m.meterCellId().type() == PIPELINE_INDEPENDENT) {
+ PiMeterCellId piMeterCellId = (PiMeterCellId) m.meterCellId();
+ scope = MeterScope.of(piMeterCellId.meterId().id());
+ } else {
+ scope = MeterScope.globalScope();
+ }
+ MeterTableKey meterTableKey = MeterTableKey.key(m.deviceId(), scope);
+ freeMeterId(meterTableKey, m.meterCellId());
}
} catch (StorageException e) {
log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
@@ -350,16 +392,41 @@
.collect(Collectors.toList());
// Remove definitely the meter
metersPendingRemove.forEach(versionedMeterKey
- -> deleteMeterNow(versionedMeterKey.value().meter()));
+ -> purgeMeter(versionedMeterKey.value().meter()));
}
@Override
public long getMaxMeters(MeterFeaturesKey key) {
// Leverage the meter features to know the max id
- MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
+ // Create a Meter Table key with FeaturesKey's device and global scope
+ MeterTableKey meterTableKey = MeterTableKey.key(key.deviceId(), MeterScope.globalScope());
+ return getMaxMeters(meterTableKey);
+ }
+
+ private long getMaxMeters(MeterTableKey key) {
+ // Leverage the meter features to know the max id
+ MeterFeatures features = metersFeatures.get(key);
return features == null ? 0L : features.maxMeter();
}
+ private long getStartIndex(MeterTableKey key) {
+ // Leverage the meter features to know the start id
+ // Since we are using index now
+ // if there is no features related to the key
+ // -1 is returned
+ MeterFeatures features = metersFeatures.get(key);
+ return features == null ? -1L : features.startIndex();
+ }
+
+ private long getEndIndex(MeterTableKey key) {
+ // Leverage the meter features to know the max id
+ // Since we are using index now
+ // if there is no features related to the key
+ // -1 is returned
+ MeterFeatures features = metersFeatures.get(key);
+ return features == null ? -1L : features.endIndex();
+ }
+
// queryMaxMeters is implemented in FullMetersAvailable behaviour.
private long queryMaxMeters(DeviceId device) {
// Get driver handler for this device
@@ -371,18 +438,31 @@
}
// Get the behavior
MeterQuery query = handler.behaviour(MeterQuery.class);
+ // Insert a new available key set to the map
+ String setName = AVAILABLEMETERIDSTORE + "-" + device + "global";
+ MeterTableKey meterTableKey = MeterTableKey.key(device, MeterScope.globalScope());
+ insertAvailableKeySet(meterTableKey, setName);
// Return as max meter the result of the query
return query.getMaxMeters();
}
- private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
+ private boolean updateMeterIdAvailability(MeterTableKey meterTableKey, MeterCellId id,
boolean available) {
+ // Retrieve the set first
+ DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
+ if (keySet == null) {
+ // A reusable set should be inserted when a features is pushed
+ log.warn("Reusable Key set for device: {} scope: {} not found",
+ meterTableKey.deviceId(), meterTableKey.scope());
+ return false;
+ }
// According to available, make available or unavailable a meter key
- return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
- availableMeterIds.remove(MeterKey.key(deviceId, id));
+ DeviceId deviceId = meterTableKey.deviceId();
+ return available ? keySet.add(MeterKey.key(deviceId, id)) :
+ keySet.remove(MeterKey.key(deviceId, id));
}
- private MeterId getNextAvailableId(Set<MeterId> availableIds) {
+ private MeterCellId getNextAvailableId(Set<MeterCellId> availableIds) {
// If there are no available ids
if (availableIds.isEmpty()) {
// Just end the cycle
@@ -399,18 +479,27 @@
}
// Implements reuse strategy
- private MeterId firstReusableMeterId(DeviceId deviceId) {
+ private MeterCellId firstReusableMeterId(MeterTableKey meterTableKey) {
+ // Create a Table key and use it to retrieve the reusable meterCellId set
+ DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
+ if (keySet == null) {
+ // A reusable set should be inserted when a features is pushed
+ log.warn("Reusable Key set for device: {} scope: {} not found",
+ meterTableKey.deviceId(), meterTableKey.scope());
+ return null;
+ }
// Filter key related to device id, and reduce to meter ids
- Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
- .filter(meterKey -> meterKey.deviceId().equals(deviceId))
+ Set<MeterCellId> localAvailableMeterIds = keySet.stream()
+ .filter(meterKey ->
+ meterKey.deviceId().equals(meterTableKey.deviceId()))
.map(MeterKey::meterId)
.collect(Collectors.toSet());
// Get next available id
- MeterId meterId = getNextAvailableId(localAvailableMeterIds);
+ MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
// Iterate until there are items
while (meterId != null) {
// If we are able to reserve the id
- if (updateMeterIdAvailability(deviceId, meterId, false)) {
+ if (updateMeterIdAvailability(meterTableKey, meterId, false)) {
// Just end
return meterId;
}
@@ -425,49 +514,86 @@
@Override
public MeterId allocateMeterId(DeviceId deviceId) {
- // Init steps
- MeterId meterId;
+ // We use global scope for MeterId
+ return (MeterId) allocateMeterId(deviceId, MeterScope.globalScope());
+ }
+
+ @Override
+ public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
+ MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
+ MeterCellId meterCellId;
long id;
- // Try to reuse meter id
- meterId = firstReusableMeterId(deviceId);
- // We found a reusable id, return
- if (meterId != null) {
- return meterId;
+ // First, search for reusable key
+ meterCellId = firstReusableMeterId(meterTableKey);
+ if (meterCellId != null) {
+ // A reusable key is found
+ return meterCellId;
}
- // If there was no reusable MeterId we have to generate a new value
- // using maxMeters as upper limit.
- long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
+ // If there was no reusable meter id we have to generate a new value
+ // using start and end index as lower and upper bound respectively.
+ long startIndex = getStartIndex(meterTableKey);
+ long endIndex = getEndIndex(meterTableKey);
// If the device does not give us MeterFeatures
- if (maxMeters == 0L) {
+ if (startIndex == -1L || endIndex == -1L) {
// MeterFeatures couldn't be retrieved, fallback to queryMeters.
- maxMeters = queryMaxMeters(deviceId);
- }
- // If we don't know the max, cannot proceed
- if (maxMeters == 0L) {
- return null;
+ // Only meaningful to OpenFLow
+ long maxMeters = queryMaxMeters(deviceId);
+ // If we don't know the max, cannot proceed
+ if (maxMeters == 0L) {
+ return null;
+ } else {
+ // OpenFlow meter index starts from 1, ends with max-1
+ startIndex = 1L;
+ endIndex = maxMeters - 1;
+ }
}
// Get a new value
- id = meterIdGenerators.incrementAndGet(deviceId);
- // Check with the max, and if the value is bigger, cannot proceed
- if (id >= maxMeters) {
+ // If the value is smaller than the start index, get another one
+ do {
+ id = meterIdGenerators.incrementAndGet(meterTableKey);
+ } while (id < startIndex);
+ // Check with the end index, and if the value is bigger, cannot proceed
+ if (id > endIndex) {
return null;
}
// Done, return the value
- return MeterId.meterId(id);
+ // If we are using global scope, return a MeterId
+ // Else, return a PiMeterId
+ if (meterScope.isGlobal()) {
+ return MeterId.meterId(id);
+ } else {
+ return PiMeterCellId.ofIndirect(PiMeterId.of(meterScope.id()), id);
+ }
+
}
@Override
public void freeMeterId(DeviceId deviceId, MeterId meterId) {
+ MeterTableKey meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
+ freeMeterId(meterTableKey, meterId);
+ }
+
+ private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
+ long index;
+ if (meterCellId.type() == PIPELINE_INDEPENDENT) {
+ PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
+ index = piMeterCellId.index();
+ } else if (meterCellId.type() == INDEX) {
+ MeterId meterId = (MeterId) meterCellId;
+ index = meterId.id();
+ } else {
+ return;
+ }
// Avoid to free meter not allocated
- if (meterIdGenerators.get(deviceId) < meterId.id()) {
+ if (meterIdGenerators.get(meterTableKey) < index) {
return;
}
// Update the availability
- updateMeterIdAvailability(deviceId, meterId, true);
+ updateMeterIdAvailability(meterTableKey, meterCellId, true);
}
// Enabling the events distribution across the cluster
- private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
+ private class InternalMetersMapEventListener implements MapEventListener<MeterKey, MeterData> {
@Override
public void event(MapEvent<MeterKey, MeterData> event) {
MeterKey key = event.key();
@@ -523,8 +649,42 @@
default:
log.warn("Unknown Map event type {}", event.type());
}
-
}
}
+ private class InternalFeaturesMapEventListener implements
+ EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> {
+ @Override
+ public void event(EventuallyConsistentMapEvent<MeterTableKey, MeterFeatures> event) {
+ MeterTableKey meterTableKey = event.key();
+ MeterFeatures meterFeatures = event.value();
+ switch (event.type()) {
+ case PUT:
+ // Put a new available meter id set to the map
+ String setName = AVAILABLEMETERIDSTORE + "-" +
+ meterFeatures.deviceId() + meterFeatures.scope().id();
+ insertAvailableKeySet(meterTableKey, setName);
+ break;
+ case REMOVE:
+ // Remove the set
+ DistributedSet<MeterKey> set = availableMeterIds.remove(meterTableKey);
+ if (set != null) {
+ set.destroy();
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private void insertAvailableKeySet(MeterTableKey meterTableKey, String setName) {
+ DistributedSet<MeterKey> availableMeterIdSet =
+ new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
+ .withName(setName)
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ MeterKey.class)).build(),
+ DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
+ availableMeterIds.put(meterTableKey, availableMeterIdSet);
+ }
}