[ONOS-7114] Meter Subsystem Refactoring

Changes:
- Moves meter counters in the store
- Uses atomic counter map for meter counters
- Implements atomic counter map adapter and test atomic counter map
- Introduces free meter id
- Changes allocate meter id
- Implements unit tests for MeterManager and MeterStore

Change-Id: I45e3debc0e43ca7edcf6e3b4065866634f76f9f7
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
index 5c51478..2903663 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -16,7 +16,9 @@
 package org.onosproject.incubator.store.meter.impl;
 
 import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang.math.RandomUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -27,6 +29,9 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.MeterQuery;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
 import org.onosproject.net.meter.Band;
 import org.onosproject.net.meter.DefaultBand;
 import org.onosproject.net.meter.DefaultMeter;
@@ -44,8 +49,12 @@
 import org.onosproject.net.meter.MeterStoreDelegate;
 import org.onosproject.net.meter.MeterStoreResult;
 import org.onosproject.store.AbstractStore;
+import org.onosproject.store.primitives.DefaultDistributedSet;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.DistributedSet;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
@@ -54,12 +63,13 @@
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
-import java.util.Arrays;
-import java.util.BitSet;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
+import static org.onosproject.incubator.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
 import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -77,6 +87,7 @@
     private static final String METERSTORE = "onos-meter-store";
     private static final String METERFEATURESSTORE = "onos-meter-features-store";
     private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
+    private static final String METERIDSTORE = "onos-meters-id-store";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private StorageService storageService;
@@ -87,6 +98,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DriverService driverService;
+
     private ConsistentMap<MeterKey, MeterData> meters;
     private NodeId local;
 
@@ -97,7 +111,27 @@
     private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
             Maps.newConcurrentMap();
 
-    private ConsistentMap<DeviceId, BitSet> availableMeterIds;
+    // Available meter identifiers
+    private DistributedSet<MeterKey> availableMeterIds;
+
+    // Atomic counter map for generation of new identifiers;
+    private AtomicCounterMap<DeviceId> meterIdGenerators;
+
+    /**
+     * Defines possible selection strategies to reuse meter ids.
+     */
+    enum ReuseStrategy {
+        /**
+         * Select randomly an available id.
+         */
+        RANDOM,
+        /**
+         * Select the first one.
+         */
+        FIRST_FIT
+    }
+
+    private ReuseStrategy reuseStrategy = FIRST_FIT;
 
     @Activate
     public void activate() {
@@ -105,7 +139,7 @@
 
         meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
                     .withName(METERSTORE)
-                    .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+                    .withSerializer(Serializer.using(KryoNamespaces.API,
                                                      MeterKey.class,
                                                      MeterData.class,
                                                      DefaultMeter.class,
@@ -119,16 +153,24 @@
 
         meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
                 .withName(METERFEATURESSTORE)
-                .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
-                        MeterFeaturesKey.class,
-                        MeterFeatures.class,
-                        DefaultMeterFeatures.class,
-                        Band.Type.class,
-                        Meter.Unit.class,
-                        MeterFailReason.class)).build();
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 MeterFeaturesKey.class,
+                                                 MeterFeatures.class,
+                                                 DefaultMeterFeatures.class,
+                                                 Band.Type.class,
+                                                 Meter.Unit.class,
+                                                 MeterFailReason.class)).build();
 
-        availableMeterIds = storageService.<DeviceId, BitSet>consistentMapBuilder()
+        // Init the set of the available ids
+        availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
                 .withName(AVAILABLEMETERIDSTORE)
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 MeterKey.class)).build(),
+                DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
+
+        // Init atomic map counters
+        meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
+                .withName(METERIDSTORE)
                 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
 
         log.info("Started");
@@ -140,67 +182,45 @@
         log.info("Stopped");
     }
 
-    private void updateMeterIdAvailability(DeviceId deviceId, MeterId id,
-                                           boolean available) {
-        availableMeterIds.compute(deviceId, (k, v) -> {
-            v = v == null ? new BitSet() : v;
-            v.set(id.id().intValue(), available);
-            return v;
-        });
-    }
-
-    @Override
-    public MeterId firstReusableMeterId(DeviceId deviceId) {
-        Versioned<BitSet> bitSetVersioned = availableMeterIds.get(deviceId);
-        if (bitSetVersioned == null) {
-            return null;
-        }
-        BitSet value = bitSetVersioned.value();
-        int nextSetBit = value.nextSetBit(1);
-        if (nextSetBit < 0) {
-            return null;
-        }
-        return MeterId.meterId(nextSetBit);
-    }
-
     @Override
     public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
+        // Init steps
         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
         MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
-        updateMeterIdAvailability(meter.deviceId(), meter.id(), false);
+        // Store the future related to the operation
         futures.put(key, future);
+        // Store the meter data
         MeterData data = new MeterData(meter, null, local);
-
         try {
             meters.put(key, data);
         } catch (StorageException e) {
             future.completeExceptionally(e);
         }
-
+        // Done, return the future
         return future;
-
     }
 
     @Override
     public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
+        // Init steps
         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
         MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+        // Store the future related to the operation
         futures.put(key, future);
-
+        // Create the meter data
         MeterData data = new MeterData(meter, null, local);
-
-        // update the state of the meter. It will be pruned by observing
+        // Update the state of the meter. It will be pruned by observing
         // that it has been removed from the dataplane.
         try {
+            // If it does not exist in the system
             if (meters.computeIfPresent(key, (k, v) -> data) == null) {
+                // Complete immediately
                 future.complete(MeterStoreResult.success());
             }
-            updateMeterIdAvailability(meter.deviceId(), meter.id(), true);
         } catch (StorageException e) {
             future.completeExceptionally(e);
         }
-
-
+        // Done, return the future
         return future;
     }
 
@@ -289,9 +309,15 @@
 
     @Override
     public void deleteMeterNow(Meter m) {
+        // Create the key
         MeterKey key = MeterKey.key(m.deviceId(), m.id());
+        // Remove the future
         futures.remove(key);
+        // Remove the meter
         meters.remove(key);
+        // Free the id
+        freeMeterId(m.deviceId(), m.id());
+        // Finally notify the delegate
         notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
     }
 
@@ -301,6 +327,108 @@
         return features == null ? 0L : features.maxMeter();
     }
 
+    // queryMaxMeters is implemented in FullMetersAvailable behaviour.
+    private long queryMaxMeters(DeviceId device) {
+        // Get driver handler for this device
+        DriverHandler handler = driverService.createHandler(device);
+        // If creation failed or the device does not have this behavior
+        if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
+            // We cannot know max meter
+            return 0L;
+        }
+        // Get the behavior
+        MeterQuery query = handler.behaviour(MeterQuery.class);
+        // Return as max meter the result of the query
+        return query.getMaxMeters();
+    }
+
+    private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
+                                              boolean available) {
+        // According to available, make available or unavailable a meter key
+        return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
+                availableMeterIds.remove(MeterKey.key(deviceId, id));
+    }
+
+    private MeterId getNextAvailableId(Set<MeterId> availableIds) {
+        // If there are no available ids
+        if (availableIds.isEmpty()) {
+            // Just end the cycle
+            return null;
+        }
+        // If it is the first fit
+        if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
+            return availableIds.iterator().next();
+        }
+        // If it is random, get the size
+        int size = availableIds.size();
+        // Return a random element
+        return Iterables.get(availableIds, RandomUtils.nextInt(size));
+    }
+
+    // Implements reuse strategy
+    private MeterId firstReusableMeterId(DeviceId deviceId) {
+        // Filter key related to device id, and reduce to meter ids
+        Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
+                .filter(meterKey -> meterKey.deviceId().equals(deviceId))
+                .map(MeterKey::meterId)
+                .collect(Collectors.toSet());
+        // Get next available id
+        MeterId meterId = getNextAvailableId(localAvailableMeterIds);
+        // Iterate until there are items
+        while (meterId != null) {
+            // If we are able to reserve the id
+            if (updateMeterIdAvailability(deviceId, meterId, false)) {
+                // Just end
+                return meterId;
+            }
+            // Update the set
+            localAvailableMeterIds.remove(meterId);
+            // Try another time
+            meterId = getNextAvailableId(localAvailableMeterIds);
+        }
+        // No reusable ids
+        return null;
+    }
+
+    @Override
+    public MeterId allocateMeterId(DeviceId deviceId) {
+        // Init steps
+        MeterId meterId;
+        long id;
+        // Try to reuse meter id
+        meterId = firstReusableMeterId(deviceId);
+        // We found a reusable id, return
+        if (meterId != null) {
+            return meterId;
+        }
+        // If there was no reusable MeterId we have to generate a new value
+        // using maxMeters as upper limit.
+        long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
+        // If the device does not give us MeterFeatures
+        if (maxMeters == 0L) {
+            // MeterFeatures couldn't be retrieved, fallback to queryMeters.
+            maxMeters = queryMaxMeters(deviceId);
+        }
+        // If we don't know the max, cannot proceed
+        if (maxMeters == 0L) {
+            return null;
+        }
+        // Get a new value
+        id = meterIdGenerators.incrementAndGet(deviceId);
+        // Check with the max, and if the value is bigger, cannot proceed
+        if (id >= maxMeters) {
+            return null;
+        }
+        // Done, return the value
+        return MeterId.meterId(id);
+    }
+
+    @Override
+    public void freeMeterId(DeviceId deviceId, MeterId meterId) {
+        // Update the availability
+        updateMeterIdAvailability(deviceId, meterId, true);
+    }
+
     private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
         @Override
         public void event(MapEvent<MeterKey, MeterData> event) {