[SDFAB-352][SDFAB-353] Retrieve MeterFeatures from the P4RT southbound, Extend MeterProviderService and revisit MeterStore

Change-Id: If0dae53643988cb551ff5020abd792cb6d33ff6b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 11570f7..2fdba6f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -31,6 +31,7 @@
 import org.onosproject.net.meter.DefaultMeter;
 import org.onosproject.net.meter.DefaultMeterFeatures;
 import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterCellId;
 import org.onosproject.net.meter.MeterEvent;
 import org.onosproject.net.meter.MeterFailReason;
 import org.onosproject.net.meter.MeterFeatures;
@@ -39,10 +40,14 @@
 import org.onosproject.net.meter.MeterId;
 import org.onosproject.net.meter.MeterKey;
 import org.onosproject.net.meter.MeterOperation;
+import org.onosproject.net.meter.MeterScope;
 import org.onosproject.net.meter.MeterState;
 import org.onosproject.net.meter.MeterStore;
 import org.onosproject.net.meter.MeterStoreDelegate;
 import org.onosproject.net.meter.MeterStoreResult;
+import org.onosproject.net.meter.MeterTableKey;
+import org.onosproject.net.pi.model.PiMeterId;
+import org.onosproject.net.pi.runtime.PiMeterCellId;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.primitives.DefaultDistributedSet;
 import org.onosproject.store.serializers.KryoNamespaces;
@@ -50,12 +55,16 @@
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.WallClockTimestamp;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -64,6 +73,8 @@
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -73,6 +84,8 @@
 
 import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
 import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
+import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
+import static org.onosproject.net.meter.MeterCellId.MeterCellType.PIPELINE_INDEPENDENT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -88,20 +101,22 @@
     // Meters map related objects
     private static final String METERSTORE = "onos-meter-store";
     private ConsistentMap<MeterKey, MeterData> meters;
-    private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
+    private MapEventListener<MeterKey, MeterData> metersMapListener = new InternalMetersMapEventListener();
     private Map<MeterKey, MeterData> metersMap;
 
     // Meters features related objects
     private static final String METERFEATURESSTORE = "onos-meter-features-store";
-    private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
+    private EventuallyConsistentMap<MeterTableKey, MeterFeatures> metersFeatures;
+    private EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> featuresMapListener =
+        new InternalFeaturesMapEventListener();
 
     // Meters id related objects
     private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
     // Available meter identifiers
-    private DistributedSet<MeterKey> availableMeterIds;
+    private ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
     // Atomic counter map for generation of new identifiers;
     private static final String METERIDSTORE = "onos-meters-id-store";
-    private AtomicCounterMap<DeviceId> meterIdGenerators;
+    private AtomicCounterMap<MeterTableKey> meterIdGenerators;
 
     // Serializer related objects
     private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
@@ -147,35 +162,44 @@
         meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
                     .withName(METERSTORE)
                     .withSerializer(serializer).build();
-        meters.addListener(mapListener);
-        // Init meter features map (meaningful only for OpenFlow protocol)
-        meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
-                .withName(METERFEATURESSTORE)
-                .withSerializer(Serializer.using(KryoNamespaces.API,
-                                                 MeterFeaturesKey.class,
-                                                 MeterFeatures.class,
-                                                 DefaultMeterFeatures.class,
-                                                 Band.Type.class,
-                                                 Meter.Unit.class,
-                                                 MeterFailReason.class,
-                                                 MeterFeaturesFlag.class)).build();
+        meters.addListener(metersMapListener);
         metersMap = meters.asJavaMap();
-        // Init the set of the available ids
-        availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
-                .withName(AVAILABLEMETERIDSTORE)
-                .withSerializer(Serializer.using(KryoNamespaces.API,
-                                                 MeterKey.class)).build(),
-                DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
+        // Init meter features map
+        metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
+                .withName(METERFEATURESSTORE)
+                .withTimestampProvider((key, features) -> new WallClockTimestamp())
+                .withSerializer(KryoNamespace.newBuilder()
+                                .register(KryoNamespaces.API)
+                                .register(MeterTableKey.class)
+                                .register(MeterFeatures.class)
+                                .register(DefaultMeterFeatures.class)
+                                .register(DefaultBand.class)
+                                .register(Band.Type.class)
+                                .register(Meter.Unit.class)
+                                .register(MeterFailReason.class)
+                                .register(MeterFeaturesFlag.class)).build();
+        metersFeatures.addListener(featuresMapListener);
+        // Init the map of the available ids set
+        // Set will be created when a new Meter Features is pushed to the store
+        availableMeterIds = new ConcurrentHashMap<>();
         // Init atomic map counters
-        meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
+        meterIdGenerators = storageService.<MeterTableKey>atomicCounterMapBuilder()
                 .withName(METERIDSTORE)
-                .withSerializer(Serializer.using(KryoNamespaces.API)).build();
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                 MeterTableKey.class,
+                                                 MeterScope.class)).build();
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        meters.removeListener(mapListener);
+        meters.removeListener(metersMapListener);
+        metersFeatures.removeListener(featuresMapListener);
+        meters.destroy();
+        metersFeatures.destroy();
+        availableMeterIds.forEach((key, set) -> {
+            set.destroy();
+        });
         log.info("Stopped");
     }
 
@@ -229,11 +253,11 @@
 
     @Override
     public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
-        // Store meter features, this is done once for each device
+        // Store meter features, this is done once for each features of every device
         MeterStoreResult result = MeterStoreResult.success();
-        MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
+        MeterTableKey key = MeterTableKey.key(meterfeatures.deviceId(), meterfeatures.scope());
         try {
-            meterFeatures.putIfAbsent(key, meterfeatures);
+            metersFeatures.put(key, meterfeatures);
         } catch (StorageException e) {
             log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
                     e.getMessage(), e);
@@ -244,16 +268,20 @@
 
     @Override
     public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
-        // Remove meter features - these ops are meaningful only for OpenFlow
         MeterStoreResult result = MeterStoreResult.success();
-        MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
         try {
-            meterFeatures.remove(key);
+            Set<MeterTableKey> keys = metersFeatures.keySet().stream()
+                                        .filter(key -> key.deviceId().equals(deviceId))
+                                        .collect(Collectors.toUnmodifiableSet());
+            keys.forEach(k -> {
+                metersFeatures.remove(k);
+            });
         } catch (StorageException e) {
             log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
-                    e.getMessage(), e);
+                        e.getMessage(), e);
             result = MeterStoreResult.fail(TIMEOUT);
         }
+
         return result;
     }
 
@@ -327,13 +355,27 @@
 
     @Override
     public void deleteMeterNow(Meter m) {
+        // This method is renamed in onos-2.5
+        purgeMeter(m);
+    }
+
+    @Override
+    public void purgeMeter(Meter m) {
         // Once we receive the ack from the sb
         // create the key and remove definitely the meter
         MeterKey key = MeterKey.key(m.deviceId(), m.id());
         try {
             if (Versioned.valueOrNull(meters.remove(key)) != null) {
                 // Free the id
-                freeMeterId(m.deviceId(), m.id());
+                MeterScope scope;
+                if (m.meterCellId().type() == PIPELINE_INDEPENDENT) {
+                    PiMeterCellId piMeterCellId = (PiMeterCellId) m.meterCellId();
+                    scope = MeterScope.of(piMeterCellId.meterId().id());
+                } else {
+                    scope = MeterScope.globalScope();
+                }
+                MeterTableKey meterTableKey = MeterTableKey.key(m.deviceId(), scope);
+                freeMeterId(meterTableKey, m.meterCellId());
             }
         } catch (StorageException e) {
             log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
@@ -350,16 +392,41 @@
                 .collect(Collectors.toList());
         // Remove definitely the meter
         metersPendingRemove.forEach(versionedMeterKey
-                -> deleteMeterNow(versionedMeterKey.value().meter()));
+                -> purgeMeter(versionedMeterKey.value().meter()));
     }
 
     @Override
     public long getMaxMeters(MeterFeaturesKey key) {
         // Leverage the meter features to know the max id
-        MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
+        // Create a Meter Table key with FeaturesKey's device and global scope
+        MeterTableKey meterTableKey = MeterTableKey.key(key.deviceId(), MeterScope.globalScope());
+        return getMaxMeters(meterTableKey);
+    }
+
+    private long getMaxMeters(MeterTableKey key) {
+        // Leverage the meter features to know the max id
+        MeterFeatures features = metersFeatures.get(key);
         return features == null ? 0L : features.maxMeter();
     }
 
+    private long getStartIndex(MeterTableKey key) {
+        // Leverage the meter features to know the start id
+        // Since we are using index now
+        // if there is no features related to the key
+        // -1 is returned
+        MeterFeatures features = metersFeatures.get(key);
+        return features == null ? -1L : features.startIndex();
+    }
+
+    private long getEndIndex(MeterTableKey key) {
+        // Leverage the meter features to know the max id
+        // Since we are using index now
+        // if there is no features related to the key
+        // -1 is returned
+        MeterFeatures features = metersFeatures.get(key);
+        return features == null ? -1L : features.endIndex();
+    }
+
     // queryMaxMeters is implemented in FullMetersAvailable behaviour.
     private long queryMaxMeters(DeviceId device) {
         // Get driver handler for this device
@@ -371,18 +438,31 @@
         }
         // Get the behavior
         MeterQuery query = handler.behaviour(MeterQuery.class);
+        // Insert a new available key set to the map
+        String setName = AVAILABLEMETERIDSTORE + "-" + device + "global";
+        MeterTableKey meterTableKey = MeterTableKey.key(device, MeterScope.globalScope());
+        insertAvailableKeySet(meterTableKey, setName);
         // Return as max meter the result of the query
         return query.getMaxMeters();
     }
 
-    private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
+    private boolean updateMeterIdAvailability(MeterTableKey meterTableKey, MeterCellId id,
                                               boolean available) {
+        // Retrieve the set first
+        DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
+        if (keySet == null) {
+            // A reusable set should be inserted when a features is pushed
+            log.warn("Reusable Key set for device: {} scope: {} not found",
+                meterTableKey.deviceId(), meterTableKey.scope());
+            return false;
+        }
         // According to available, make available or unavailable a meter key
-        return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
-                availableMeterIds.remove(MeterKey.key(deviceId, id));
+        DeviceId deviceId = meterTableKey.deviceId();
+        return available ? keySet.add(MeterKey.key(deviceId, id)) :
+                keySet.remove(MeterKey.key(deviceId, id));
     }
 
-    private MeterId getNextAvailableId(Set<MeterId> availableIds) {
+    private MeterCellId getNextAvailableId(Set<MeterCellId> availableIds) {
         // If there are no available ids
         if (availableIds.isEmpty()) {
             // Just end the cycle
@@ -399,18 +479,27 @@
     }
 
     // Implements reuse strategy
-    private MeterId firstReusableMeterId(DeviceId deviceId) {
+    private MeterCellId firstReusableMeterId(MeterTableKey meterTableKey) {
+        // Create a Table key and use it to retrieve the reusable meterCellId set
+        DistributedSet<MeterKey> keySet = availableMeterIds.get(meterTableKey);
+        if (keySet == null) {
+            // A reusable set should be inserted when a features is pushed
+            log.warn("Reusable Key set for device: {} scope: {} not found",
+                meterTableKey.deviceId(), meterTableKey.scope());
+            return null;
+        }
         // Filter key related to device id, and reduce to meter ids
-        Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
-                .filter(meterKey -> meterKey.deviceId().equals(deviceId))
+        Set<MeterCellId> localAvailableMeterIds = keySet.stream()
+                .filter(meterKey ->
+                    meterKey.deviceId().equals(meterTableKey.deviceId()))
                 .map(MeterKey::meterId)
                 .collect(Collectors.toSet());
         // Get next available id
-        MeterId meterId = getNextAvailableId(localAvailableMeterIds);
+        MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
         // Iterate until there are items
         while (meterId != null) {
             // If we are able to reserve the id
-            if (updateMeterIdAvailability(deviceId, meterId, false)) {
+            if (updateMeterIdAvailability(meterTableKey, meterId, false)) {
                 // Just end
                 return meterId;
             }
@@ -425,49 +514,86 @@
 
     @Override
     public MeterId allocateMeterId(DeviceId deviceId) {
-        // Init steps
-        MeterId meterId;
+        // We use global scope for MeterId
+        return (MeterId) allocateMeterId(deviceId, MeterScope.globalScope());
+    }
+
+    @Override
+    public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
+        MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
+        MeterCellId meterCellId;
         long id;
-        // Try to reuse meter id
-        meterId = firstReusableMeterId(deviceId);
-        // We found a reusable id, return
-        if (meterId != null) {
-            return meterId;
+        // First, search for reusable key
+        meterCellId = firstReusableMeterId(meterTableKey);
+        if (meterCellId != null) {
+            // A reusable key is found
+            return meterCellId;
         }
-        // If there was no reusable MeterId we have to generate a new value
-        // using maxMeters as upper limit.
-        long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
+        // If there was no reusable meter id we have to generate a new value
+        // using start and end index as lower and upper bound respectively.
+        long startIndex = getStartIndex(meterTableKey);
+        long endIndex = getEndIndex(meterTableKey);
         // If the device does not give us MeterFeatures
-        if (maxMeters == 0L) {
+        if (startIndex == -1L || endIndex == -1L) {
             // MeterFeatures couldn't be retrieved, fallback to queryMeters.
-            maxMeters = queryMaxMeters(deviceId);
-        }
-        // If we don't know the max, cannot proceed
-        if (maxMeters == 0L) {
-            return null;
+            // Only meaningful to OpenFLow
+            long maxMeters = queryMaxMeters(deviceId);
+            // If we don't know the max, cannot proceed
+            if (maxMeters == 0L) {
+                return null;
+            } else {
+                // OpenFlow meter index starts from 1, ends with max-1
+                startIndex = 1L;
+                endIndex = maxMeters - 1;
+            }
         }
         // Get a new value
-        id = meterIdGenerators.incrementAndGet(deviceId);
-        // Check with the max, and if the value is bigger, cannot proceed
-        if (id >= maxMeters) {
+        // If the value is smaller than the start index, get another one
+        do {
+            id = meterIdGenerators.incrementAndGet(meterTableKey);
+        } while (id < startIndex);
+        // Check with the end index, and if the value is bigger, cannot proceed
+        if (id > endIndex) {
             return null;
         }
         // Done, return the value
-        return MeterId.meterId(id);
+        // If we are using global scope, return a MeterId
+        // Else, return a PiMeterId
+        if (meterScope.isGlobal()) {
+            return MeterId.meterId(id);
+        } else {
+            return PiMeterCellId.ofIndirect(PiMeterId.of(meterScope.id()), id);
+        }
+
     }
 
     @Override
     public void freeMeterId(DeviceId deviceId, MeterId meterId) {
+        MeterTableKey meterTableKey = MeterTableKey.key(deviceId, MeterScope.globalScope());
+        freeMeterId(meterTableKey, meterId);
+    }
+
+    private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
+        long index;
+        if (meterCellId.type() == PIPELINE_INDEPENDENT) {
+            PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
+            index = piMeterCellId.index();
+        } else if (meterCellId.type() == INDEX) {
+            MeterId meterId = (MeterId) meterCellId;
+            index = meterId.id();
+        } else {
+            return;
+        }
         // Avoid to free meter not allocated
-        if (meterIdGenerators.get(deviceId) < meterId.id()) {
+        if (meterIdGenerators.get(meterTableKey) < index) {
             return;
         }
         // Update the availability
-        updateMeterIdAvailability(deviceId, meterId, true);
+        updateMeterIdAvailability(meterTableKey, meterCellId, true);
     }
 
     // Enabling the events distribution across the cluster
-    private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
+    private class InternalMetersMapEventListener implements MapEventListener<MeterKey, MeterData> {
         @Override
         public void event(MapEvent<MeterKey, MeterData> event) {
             MeterKey key = event.key();
@@ -523,8 +649,42 @@
                 default:
                     log.warn("Unknown Map event type {}", event.type());
             }
-
         }
     }
 
+    private class InternalFeaturesMapEventListener implements
+        EventuallyConsistentMapListener<MeterTableKey, MeterFeatures> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<MeterTableKey, MeterFeatures> event) {
+            MeterTableKey meterTableKey = event.key();
+            MeterFeatures meterFeatures = event.value();
+            switch (event.type()) {
+                case PUT:
+                    // Put a new available meter id set to the map
+                    String setName = AVAILABLEMETERIDSTORE + "-" +
+                        meterFeatures.deviceId() + meterFeatures.scope().id();
+                    insertAvailableKeySet(meterTableKey, setName);
+                    break;
+                case REMOVE:
+                    // Remove the set
+                    DistributedSet<MeterKey> set = availableMeterIds.remove(meterTableKey);
+                    if (set != null) {
+                        set.destroy();
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    private void insertAvailableKeySet(MeterTableKey meterTableKey, String setName) {
+        DistributedSet<MeterKey> availableMeterIdSet =
+            new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
+                .withName(setName)
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                                MeterKey.class)).build(),
+                DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
+        availableMeterIds.put(meterTableKey, availableMeterIdSet);
+    }
 }