[SDFAB-500][SDFAB-499] Implement user defined index mode for the meter service

- Introduce a boolean to control the meter service modes
- User defined mode does not provide any coordination to the apps
- Only one mode can be active at time
- In addition some sanity checks are peformed by the meter service
- Update existing unit tests and add new ones to test the new behaviors
- Initial clean up of the meters subsystems

Change-Id: I61500b794f27e94abd11637c84bce0dbb2e073f3
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 7f7873c..afc6d94 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -83,6 +83,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
 import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
 import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
@@ -115,7 +116,7 @@
     // Meters id related objects
     private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
     // Available meter identifiers
-    private ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
+    protected ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
     // Atomic counter map for generation of new identifiers;
     private static final String METERIDSTORE = "onos-meters-id-store";
     private AtomicCounterMap<MeterTableKey> meterIdGenerators;
@@ -130,7 +131,12 @@
             .register(Band.Type.class)
             .register(MeterState.class)
             .register(Meter.Unit.class)
-            .register(MeterFailReason.class);
+            .register(MeterFailReason.class)
+            .register(MeterTableKey.class)
+            .register(MeterFeatures.class)
+            .register(DefaultMeterFeatures.class)
+            .register(MeterFeaturesFlag.class)
+            .register(MeterScope.class);
     private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -143,6 +149,9 @@
     private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
             Maps.newConcurrentMap();
 
+    // Control the user defined index mode for the store.
+    protected boolean userDefinedIndexMode = false;
+
     /**
      * Defines possible selection strategies to reuse meter ids.
      */
@@ -170,16 +179,7 @@
         metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
                 .withName(METERFEATURESSTORE)
                 .withTimestampProvider((key, features) -> new WallClockTimestamp())
-                .withSerializer(KryoNamespace.newBuilder()
-                                .register(KryoNamespaces.API)
-                                .register(MeterTableKey.class)
-                                .register(MeterFeatures.class)
-                                .register(DefaultMeterFeatures.class)
-                                .register(DefaultBand.class)
-                                .register(Band.Type.class)
-                                .register(Meter.Unit.class)
-                                .register(MeterFailReason.class)
-                                .register(MeterFeaturesFlag.class)).build();
+                .withSerializer(APP_KRYO_BUILDER).build();
         metersFeatures.addListener(featuresMapListener);
         // Init the map of the available ids set
         // Set will be created when a new Meter Features is pushed to the store
@@ -190,6 +190,7 @@
                 .withSerializer(Serializer.using(KryoNamespaces.API,
                                                  MeterTableKey.class,
                                                  MeterScope.class)).build();
+
         log.info("Started");
     }
 
@@ -199,15 +200,15 @@
         metersFeatures.removeListener(featuresMapListener);
         meters.destroy();
         metersFeatures.destroy();
-        availableMeterIds.forEach((key, set) -> {
-            set.destroy();
-        });
+        availableMeterIds.forEach((key, set) -> set.destroy());
+
         log.info("Stopped");
     }
 
     @Override
     public CompletableFuture<MeterStoreResult> addOrUpdateMeter(Meter meter) {
-        // Init steps
+        // Verify integrity of the index
+        checkArgument(validIndex(meter), "Meter index is not valid");
         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
         MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
         MeterData data = new MeterData(meter, null);
@@ -227,23 +228,7 @@
 
     @Override
     public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
-        // Init steps
-        CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
-        MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
-        // Store the future related to the operation
-        futures.put(key, future);
-        // Store the meter data
-        MeterData data = new MeterData(meter, null);
-        try {
-            meters.put(key, data);
-        } catch (StorageException e) {
-            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
-                    e.getMessage(), e);
-            futures.remove(key);
-            future.completeExceptionally(e);
-        }
-        // Done, return the future
-        return future;
+        return addOrUpdateMeter(meter);
     }
 
     @Override
@@ -309,9 +294,7 @@
             Set<MeterTableKey> keys = metersFeatures.keySet().stream()
                                         .filter(key -> key.deviceId().equals(deviceId))
                                         .collect(Collectors.toUnmodifiableSet());
-            keys.forEach(k -> {
-                metersFeatures.remove(k);
-            });
+            keys.forEach(k -> metersFeatures.remove(k));
         } catch (StorageException e) {
             log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
                         e.getMessage(), e);
@@ -342,27 +325,6 @@
     }
 
     @Override
-    // TODO Should we remove it ? We are not using it
-    public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
-        CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
-        MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
-        futures.put(key, future);
-
-        MeterData data = new MeterData(meter, null);
-        try {
-            if (meters.computeIfPresent(key, (k, v) -> data) == null) {
-                future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
-            }
-        } catch (StorageException e) {
-            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
-                    e.getMessage(), e);
-            futures.remove(key);
-            future.completeExceptionally(e);
-        }
-        return future;
-    }
-
-    @Override
     public Meter updateMeterState(Meter meter) {
         // Update meter if present (stats workflow)
         MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
@@ -405,7 +367,7 @@
     @Override
     public void failedMeter(MeterOperation op, MeterFailReason reason) {
         // Meter ops failed (got notification from the sb)
-        MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
+        MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().meterCellId());
         meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
     }
 
@@ -419,7 +381,7 @@
     public void purgeMeter(Meter m) {
         // Once we receive the ack from the sb
         // create the key and remove definitely the meter
-        MeterKey key = MeterKey.key(m.deviceId(), m.id());
+        MeterKey key = MeterKey.key(m.deviceId(), m.meterCellId());
         try {
             if (Versioned.valueOrNull(meters.remove(key)) != null) {
                 // Free the id
@@ -441,6 +403,12 @@
 
     @Override
     public void purgeMeter(DeviceId deviceId) {
+        // This method is renamed in onos-2.5
+        purgeMeters(deviceId);
+    }
+
+    @Override
+    public void purgeMeters(DeviceId deviceId) {
         List<Versioned<MeterData>> metersPendingRemove = meters.stream()
                 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
                 .map(Map.Entry::getValue)
@@ -456,7 +424,19 @@
                         e.getValue().value().meter().appId().equals(appId))
                 .map(Map.Entry::getValue)
                 .collect(Collectors.toList());
-        metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+        metersPendingRemove.forEach(versionedMeterKey
+                -> purgeMeter(versionedMeterKey.value().meter()));
+    }
+
+    @Override
+    public boolean userDefinedIndexMode(boolean enable) {
+        if (meters.isEmpty() && meterIdGenerators.isEmpty()) {
+            userDefinedIndexMode = enable;
+        } else {
+            log.warn("Unable to {} user defined index mode as store did" +
+                    "already some allocations", enable ? "activate" : "deactivate");
+        }
+        return userDefinedIndexMode;
     }
 
     @Override
@@ -473,6 +453,28 @@
         return features == null ? 0L : features.maxMeter();
     }
 
+    private boolean validIndex(Meter meter) {
+        long index;
+        MeterTableKey key;
+
+        if (meter.meterCellId().type() == PIPELINE_INDEPENDENT) {
+            PiMeterCellId piMeterCellId = (PiMeterCellId) meter.meterCellId();
+            index = piMeterCellId.index();
+            key = MeterTableKey.key(meter.deviceId(), MeterScope.of(piMeterCellId.meterId().id()));
+        } else if (meter.meterCellId().type() == INDEX) {
+            MeterId meterId = (MeterId) meter.meterCellId();
+            index = meterId.id();
+            key = MeterTableKey.key(meter.deviceId(), MeterScope.globalScope());
+        } else {
+            return false;
+        }
+
+        MeterFeatures features = metersFeatures.get(key);
+        long startIndex = features == null ? -1L : features.startIndex();
+        long endIndex = features == null ? -1L : features.endIndex();
+        return index >= startIndex && index <= endIndex;
+    }
+
     private long getStartIndex(MeterTableKey key) {
         // Leverage the meter features to know the start id
         // Since we are using index now
@@ -556,7 +558,7 @@
         Set<MeterCellId> localAvailableMeterIds = keySet.stream()
                 .filter(meterKey ->
                     meterKey.deviceId().equals(meterTableKey.deviceId()))
-                .map(MeterKey::meterId)
+                .map(MeterKey::meterCellId)
                 .collect(Collectors.toSet());
         // Get next available id
         MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
@@ -584,6 +586,10 @@
 
     @Override
     public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
+        if (userDefinedIndexMode) {
+            log.warn("Unable to allocate meter id when user defined index mode is enabled");
+            return null;
+        }
         MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
         MeterCellId meterCellId;
         long id;
@@ -638,6 +644,10 @@
     }
 
     private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
+        if (userDefinedIndexMode) {
+            log.warn("Unable to free meter id when user defined index mode is enabled");
+            return;
+        }
         long index;
         if (meterCellId.type() == PIPELINE_INDEPENDENT) {
             PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;