[ONOS-7114] Meter Subsystem Refactoring
Changes:
- Moves meter counters in the store
- Uses atomic counter map for meter counters
- Implements atomic counter map adapter and test atomic counter map
- Introduces free meter id
- Changes allocate meter id
- Implements unit tests for MeterManager and MeterStore
Change-Id: I45e3debc0e43ca7edcf6e3b4065866634f76f9f7
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
index 5c51478..2903663 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -16,7 +16,9 @@
package org.onosproject.incubator.store.meter.impl;
import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -27,6 +29,9 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.MeterQuery;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
import org.onosproject.net.meter.DefaultMeter;
@@ -44,8 +49,12 @@
import org.onosproject.net.meter.MeterStoreDelegate;
import org.onosproject.net.meter.MeterStoreResult;
import org.onosproject.store.AbstractStore;
+import org.onosproject.store.primitives.DefaultDistributedSet;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
@@ -54,12 +63,13 @@
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
-import java.util.Arrays;
-import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import static org.onosproject.incubator.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
import static org.slf4j.LoggerFactory.getLogger;
@@ -77,6 +87,7 @@
private static final String METERSTORE = "onos-meter-store";
private static final String METERFEATURESSTORE = "onos-meter-features-store";
private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
+ private static final String METERIDSTORE = "onos-meters-id-store";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
@@ -87,6 +98,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DriverService driverService;
+
private ConsistentMap<MeterKey, MeterData> meters;
private NodeId local;
@@ -97,7 +111,27 @@
private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
Maps.newConcurrentMap();
- private ConsistentMap<DeviceId, BitSet> availableMeterIds;
+ // Available meter identifiers
+ private DistributedSet<MeterKey> availableMeterIds;
+
+ // Atomic counter map for generation of new identifiers;
+ private AtomicCounterMap<DeviceId> meterIdGenerators;
+
+ /**
+ * Defines possible selection strategies to reuse meter ids.
+ */
+ enum ReuseStrategy {
+ /**
+ * Select randomly an available id.
+ */
+ RANDOM,
+ /**
+ * Select the first one.
+ */
+ FIRST_FIT
+ }
+
+ private ReuseStrategy reuseStrategy = FIRST_FIT;
@Activate
public void activate() {
@@ -105,7 +139,7 @@
meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
.withName(METERSTORE)
- .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+ .withSerializer(Serializer.using(KryoNamespaces.API,
MeterKey.class,
MeterData.class,
DefaultMeter.class,
@@ -119,16 +153,24 @@
meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
.withName(METERFEATURESSTORE)
- .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
- MeterFeaturesKey.class,
- MeterFeatures.class,
- DefaultMeterFeatures.class,
- Band.Type.class,
- Meter.Unit.class,
- MeterFailReason.class)).build();
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ MeterFeaturesKey.class,
+ MeterFeatures.class,
+ DefaultMeterFeatures.class,
+ Band.Type.class,
+ Meter.Unit.class,
+ MeterFailReason.class)).build();
- availableMeterIds = storageService.<DeviceId, BitSet>consistentMapBuilder()
+ // Init the set of the available ids
+ availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
.withName(AVAILABLEMETERIDSTORE)
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ MeterKey.class)).build(),
+ DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
+
+ // Init atomic map counters
+ meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
+ .withName(METERIDSTORE)
.withSerializer(Serializer.using(KryoNamespaces.API)).build();
log.info("Started");
@@ -140,67 +182,45 @@
log.info("Stopped");
}
- private void updateMeterIdAvailability(DeviceId deviceId, MeterId id,
- boolean available) {
- availableMeterIds.compute(deviceId, (k, v) -> {
- v = v == null ? new BitSet() : v;
- v.set(id.id().intValue(), available);
- return v;
- });
- }
-
- @Override
- public MeterId firstReusableMeterId(DeviceId deviceId) {
- Versioned<BitSet> bitSetVersioned = availableMeterIds.get(deviceId);
- if (bitSetVersioned == null) {
- return null;
- }
- BitSet value = bitSetVersioned.value();
- int nextSetBit = value.nextSetBit(1);
- if (nextSetBit < 0) {
- return null;
- }
- return MeterId.meterId(nextSetBit);
- }
-
@Override
public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
+ // Init steps
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
- updateMeterIdAvailability(meter.deviceId(), meter.id(), false);
+ // Store the future related to the operation
futures.put(key, future);
+ // Store the meter data
MeterData data = new MeterData(meter, null, local);
-
try {
meters.put(key, data);
} catch (StorageException e) {
future.completeExceptionally(e);
}
-
+ // Done, return the future
return future;
-
}
@Override
public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
+ // Init steps
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ // Store the future related to the operation
futures.put(key, future);
-
+ // Create the meter data
MeterData data = new MeterData(meter, null, local);
-
- // update the state of the meter. It will be pruned by observing
+ // Update the state of the meter. It will be pruned by observing
// that it has been removed from the dataplane.
try {
+ // If it does not exist in the system
if (meters.computeIfPresent(key, (k, v) -> data) == null) {
+ // Complete immediately
future.complete(MeterStoreResult.success());
}
- updateMeterIdAvailability(meter.deviceId(), meter.id(), true);
} catch (StorageException e) {
future.completeExceptionally(e);
}
-
-
+ // Done, return the future
return future;
}
@@ -289,9 +309,15 @@
@Override
public void deleteMeterNow(Meter m) {
+ // Create the key
MeterKey key = MeterKey.key(m.deviceId(), m.id());
+ // Remove the future
futures.remove(key);
+ // Remove the meter
meters.remove(key);
+ // Free the id
+ freeMeterId(m.deviceId(), m.id());
+ // Finally notify the delegate
notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
}
@@ -301,6 +327,108 @@
return features == null ? 0L : features.maxMeter();
}
+ // queryMaxMeters is implemented in FullMetersAvailable behaviour.
+ private long queryMaxMeters(DeviceId device) {
+ // Get driver handler for this device
+ DriverHandler handler = driverService.createHandler(device);
+ // If creation failed or the device does not have this behavior
+ if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
+ // We cannot know max meter
+ return 0L;
+ }
+ // Get the behavior
+ MeterQuery query = handler.behaviour(MeterQuery.class);
+ // Return as max meter the result of the query
+ return query.getMaxMeters();
+ }
+
+ private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
+ boolean available) {
+ // According to available, make available or unavailable a meter key
+ return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
+ availableMeterIds.remove(MeterKey.key(deviceId, id));
+ }
+
+ private MeterId getNextAvailableId(Set<MeterId> availableIds) {
+ // If there are no available ids
+ if (availableIds.isEmpty()) {
+ // Just end the cycle
+ return null;
+ }
+ // If it is the first fit
+ if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
+ return availableIds.iterator().next();
+ }
+ // If it is random, get the size
+ int size = availableIds.size();
+ // Return a random element
+ return Iterables.get(availableIds, RandomUtils.nextInt(size));
+ }
+
+ // Implements reuse strategy
+ private MeterId firstReusableMeterId(DeviceId deviceId) {
+ // Filter key related to device id, and reduce to meter ids
+ Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
+ .filter(meterKey -> meterKey.deviceId().equals(deviceId))
+ .map(MeterKey::meterId)
+ .collect(Collectors.toSet());
+ // Get next available id
+ MeterId meterId = getNextAvailableId(localAvailableMeterIds);
+ // Iterate until there are items
+ while (meterId != null) {
+ // If we are able to reserve the id
+ if (updateMeterIdAvailability(deviceId, meterId, false)) {
+ // Just end
+ return meterId;
+ }
+ // Update the set
+ localAvailableMeterIds.remove(meterId);
+ // Try another time
+ meterId = getNextAvailableId(localAvailableMeterIds);
+ }
+ // No reusable ids
+ return null;
+ }
+
+ @Override
+ public MeterId allocateMeterId(DeviceId deviceId) {
+ // Init steps
+ MeterId meterId;
+ long id;
+ // Try to reuse meter id
+ meterId = firstReusableMeterId(deviceId);
+ // We found a reusable id, return
+ if (meterId != null) {
+ return meterId;
+ }
+ // If there was no reusable MeterId we have to generate a new value
+ // using maxMeters as upper limit.
+ long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
+ // If the device does not give us MeterFeatures
+ if (maxMeters == 0L) {
+ // MeterFeatures couldn't be retrieved, fallback to queryMeters.
+ maxMeters = queryMaxMeters(deviceId);
+ }
+ // If we don't know the max, cannot proceed
+ if (maxMeters == 0L) {
+ return null;
+ }
+ // Get a new value
+ id = meterIdGenerators.incrementAndGet(deviceId);
+ // Check with the max, and if the value is bigger, cannot proceed
+ if (id >= maxMeters) {
+ return null;
+ }
+ // Done, return the value
+ return MeterId.meterId(id);
+ }
+
+ @Override
+ public void freeMeterId(DeviceId deviceId, MeterId meterId) {
+ // Update the availability
+ updateMeterIdAvailability(deviceId, meterId, true);
+ }
+
private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
@Override
public void event(MapEvent<MeterKey, MeterData> event) {