[AETHER-432][VOL-3263] Revisit meters subsystem to be fully distributed.

- Events are now propagated across the cluster.
- REF_COUNT_ZERO is now generated after the first stats interval.
- SB ops are offloaded to the installers.
- Installers use predictable executors.
- Deprecated MeterData API that exposes origin node and removed unused code.
- Comments are added to parts of the code that are obscure.
- MeterManager and DistributedMeterStore unit tests are improved.
- Fix an issue in TestConsistentMap.

Change-Id: I0329f903e5fdc421f29ee33f8f8133f18c794d8f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 0b4e2a4..9aec726 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -21,9 +21,6 @@
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.math.RandomUtils;
 import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.MeterQuery;
 import org.onosproject.net.driver.DriverHandler;
@@ -87,11 +84,24 @@
 
     private Logger log = getLogger(getClass());
 
+    // Meters map related objects
     private static final String METERSTORE = "onos-meter-store";
-    private static final String METERFEATURESSTORE = "onos-meter-features-store";
-    private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
-    private static final String METERIDSTORE = "onos-meters-id-store";
+    private ConsistentMap<MeterKey, MeterData> meters;
+    private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
 
+    // Meters features related objects
+    private static final String METERFEATURESSTORE = "onos-meter-features-store";
+    private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
+
+    // Meters id related objects
+    private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
+    // Available meter identifiers
+    private DistributedSet<MeterKey> availableMeterIds;
+    // Atomic counter map for generation of new identifiers;
+    private static final String METERIDSTORE = "onos-meters-id-store";
+    private AtomicCounterMap<DeviceId> meterIdGenerators;
+
+    // Serializer related objects
     private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
             .register(KryoNamespaces.API)
             .register(MeterKey.class)
@@ -102,37 +112,18 @@
             .register(MeterState.class)
             .register(Meter.Unit.class)
             .register(MeterFailReason.class);
-
     private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private StorageService storageService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    private MastershipService mastershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    private ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DriverService driverService;
 
-    private ConsistentMap<MeterKey, MeterData> meters;
-    private NodeId local;
-
-    private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
-
-    private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
-
+    // Local cache to handle async ops through futures.
     private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
             Maps.newConcurrentMap();
 
-    // Available meter identifiers
-    private DistributedSet<MeterKey> availableMeterIds;
-
-    // Atomic counter map for generation of new identifiers;
-    private AtomicCounterMap<DeviceId> meterIdGenerators;
-
     /**
      * Defines possible selection strategies to reuse meter ids.
      */
@@ -146,19 +137,16 @@
          */
         FIRST_FIT
     }
-
     private ReuseStrategy reuseStrategy = FIRST_FIT;
 
     @Activate
     public void activate() {
-        local = clusterService.getLocalNode().id();
-
+        // Init meters map and setup the map listener
         meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
                     .withName(METERSTORE)
                     .withSerializer(serializer).build();
-
         meters.addListener(mapListener);
-
+        // Init meter features map (meaningful only for OpenFlow protocol)
         meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
                 .withName(METERFEATURESSTORE)
                 .withSerializer(Serializer.using(KryoNamespaces.API,
@@ -169,19 +157,16 @@
                                                  Meter.Unit.class,
                                                  MeterFailReason.class,
                                                  MeterFeaturesFlag.class)).build();
-
         // Init the set of the available ids
         availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
                 .withName(AVAILABLEMETERIDSTORE)
                 .withSerializer(Serializer.using(KryoNamespaces.API,
                                                  MeterKey.class)).build(),
                 DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
-
         // Init atomic map counters
         meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
                 .withName(METERIDSTORE)
                 .withSerializer(Serializer.using(KryoNamespaces.API)).build();
-
         log.info("Started");
     }
 
@@ -199,10 +184,12 @@
         // Store the future related to the operation
         futures.put(key, future);
         // Store the meter data
-        MeterData data = new MeterData(meter, null, local);
+        MeterData data = new MeterData(meter, null);
         try {
             meters.put(key, data);
         } catch (StorageException e) {
+            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+                    e.getMessage(), e);
             futures.remove(key);
             future.completeExceptionally(e);
         }
@@ -218,7 +205,7 @@
         // Store the future related to the operation
         futures.put(key, future);
         // Create the meter data
-        MeterData data = new MeterData(meter, null, local);
+        MeterData data = new MeterData(meter, null);
         // Update the state of the meter. It will be pruned by observing
         // that it has been removed from the dataplane.
         try {
@@ -228,6 +215,8 @@
                 future.complete(MeterStoreResult.success());
             }
         } catch (StorageException e) {
+            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+                    e.getMessage(), e);
             futures.remove(key);
             future.completeExceptionally(e);
         }
@@ -237,11 +226,14 @@
 
     @Override
     public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
+        // Store meter features, this is done once for each device
         MeterStoreResult result = MeterStoreResult.success();
         MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
         try {
             meterFeatures.putIfAbsent(key, meterfeatures);
         } catch (StorageException e) {
+            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+                    e.getMessage(), e);
             result = MeterStoreResult.fail(TIMEOUT);
         }
         return result;
@@ -249,28 +241,34 @@
 
     @Override
     public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
+        // Remove meter features - these ops are meaningful only for OpenFlow
         MeterStoreResult result = MeterStoreResult.success();
         MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
         try {
             meterFeatures.remove(key);
         } catch (StorageException e) {
+            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+                    e.getMessage(), e);
             result = MeterStoreResult.fail(TIMEOUT);
         }
         return result;
     }
 
     @Override
+    // TODO Should we remove it ? We are not using it
     public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
         MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
         futures.put(key, future);
 
-        MeterData data = new MeterData(meter, null, local);
+        MeterData data = new MeterData(meter, null);
         try {
             if (meters.computeIfPresent(key, (k, v) -> data) == null) {
                 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
             }
         } catch (StorageException e) {
+            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+                    e.getMessage(), e);
             futures.remove(key);
             future.completeExceptionally(e);
         }
@@ -279,6 +277,7 @@
 
     @Override
     public void updateMeterState(Meter meter) {
+        // Update meter if present (stats workflow)
         MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
         meters.computeIfPresent(key, (k, v) -> {
             DefaultMeter m = (DefaultMeter) v.meter();
@@ -291,10 +290,7 @@
             m.setLife(meter.life());
             // TODO: Prune if drops to zero.
             m.setReferenceCount(meter.referenceCount());
-            if (meter.referenceCount() == 0) {
-                notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO, m));
-            }
-            return new MeterData(m, null, v.origin());
+            return new MeterData(m, null);
         });
     }
 
@@ -320,41 +316,42 @@
 
     @Override
     public void failedMeter(MeterOperation op, MeterFailReason reason) {
+        // Meter ops failed (got notification from the sb)
         MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
-        meters.computeIfPresent(key, (k, v) ->
-                new MeterData(v.meter(), reason, v.origin()));
+        meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
     }
 
     @Override
     public void deleteMeterNow(Meter m) {
-        // Create the key
+        // Once we receive the ack from the sb
+        // create the key and remove definitely the meter
         MeterKey key = MeterKey.key(m.deviceId(), m.id());
-        // Remove the future
-        futures.remove(key);
-        // Remove the meter
-        if (Versioned.valueOrNull(meters.remove(key)) != null) {
-            // Free the id
-            freeMeterId(m.deviceId(), m.id());
-            // Finally notify the delegate
-            notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
+        try {
+            if (Versioned.valueOrNull(meters.remove(key)) != null) {
+                // Free the id
+                freeMeterId(m.deviceId(), m.id());
+            }
+        } catch (StorageException e) {
+            log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+                    e.getMessage(), e);
         }
     }
 
     @Override
     public void purgeMeter(DeviceId deviceId) {
-
+        // Purge api (typically used when the device is offline)
         List<Versioned<MeterData>> metersPendingRemove = meters.stream()
                 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
                 .map(Map.Entry::getValue)
                 .collect(Collectors.toList());
-
+        // Remove definitely the meter
         metersPendingRemove.forEach(versionedMeterKey
                 -> deleteMeterNow(versionedMeterKey.value().meter()));
-
     }
 
     @Override
     public long getMaxMeters(MeterFeaturesKey key) {
+        // Leverage the meter features to know the max id
         MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
         return features == null ? 0L : features.maxMeter();
     }
@@ -465,45 +462,45 @@
         updateMeterIdAvailability(deviceId, meterId, true);
     }
 
+    // Enabling the events distribution across the cluster
     private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
         @Override
         public void event(MapEvent<MeterKey, MeterData> event) {
             MeterKey key = event.key();
             Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
             MeterData data = value.value();
-            NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
+            MeterData oldData = Versioned.valueOrNull(event.oldValue());
             switch (event.type()) {
                 case INSERT:
                 case UPDATE:
                         switch (data.meter().state()) {
                             case PENDING_ADD:
                             case PENDING_REMOVE:
-                                if (!data.reason().isPresent() && local.equals(master)) {
-                                    notifyDelegate(
-                                            new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
-                                                    MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
-                                                                  data.meter()));
-                                } else if (data.reason().isPresent() && local.equals(data.origin())) {
-                                    MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
-                                    //TODO: No future -> no friend
-                                    futures.get(key).complete(msr);
-                                }
-                                break;
-                            case ADDED:
-                                if (local.equals(data.origin()) &&
-                                        (data.meter().state() == MeterState.PENDING_ADD
-                                                || data.meter().state() == MeterState.ADDED)) {
+                                // Two cases. If there is a reason, the meter operation failed.
+                                // Otherwise, we are ready to install/remove through the delegate.
+                                if (data.reason().isEmpty()) {
+                                    notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
+                                        MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
+                                } else {
                                     futures.computeIfPresent(key, (k, v) -> {
-                                        v.complete(MeterStoreResult.success());
-                                        notifyDelegate(
-                                                new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
+                                        v.complete(MeterStoreResult.fail(data.reason().get()));
                                         return null;
                                     });
                                 }
                                 break;
-                            case REMOVED:
-                                if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
-                                    futures.remove(key).complete(MeterStoreResult.success());
+                            case ADDED:
+                                // Transition from pending to installed
+                                if (data.meter().state() == MeterState.ADDED &&
+                                        (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
+                                    futures.computeIfPresent(key, (k, v) -> {
+                                        v.complete(MeterStoreResult.success());
+                                        return null;
+                                    });
+                                    notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
+                                // Update stats case
+                                } else if (data.meter().referenceCount() == 0) {
+                                    notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
+                                            data.meter()));
                                 }
                                 break;
                             default:
@@ -511,7 +508,13 @@
                         }
                     break;
                 case REMOVE:
-                    //Only happens at origin so we do not need to care.
+                    // Meter removal case
+                    futures.computeIfPresent(key, (k, v) -> {
+                        v.complete(MeterStoreResult.success());
+                        return null;
+                    });
+                    // Finally notify the delegate
+                    notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
                     break;
                 default:
                     log.warn("Unknown Map event type {}", event.type());
@@ -520,5 +523,4 @@
         }
     }
 
-
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/MeterData.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/MeterData.java
index 8f87c2c..f392122 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/MeterData.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/MeterData.java
@@ -30,12 +30,29 @@
     private final Optional<MeterFailReason> reason;
     private final NodeId origin;
 
+    /**
+     * Builds up a meter data.
+     * @param meter the meter
+     * @param reason the reason of the failure
+     * @param origin the node from which the request is originated
+     * @deprecated in ONOS 2.2
+     */
+    @Deprecated
     public MeterData(Meter meter, MeterFailReason reason, NodeId origin) {
         this.meter = meter;
         this.reason = Optional.ofNullable(reason);
         this.origin = origin;
     }
 
+    /**
+     * Builds up a meter data.
+     * @param meter the meter
+     * @param reason the reason of the failure
+     */
+    public MeterData(Meter meter, MeterFailReason reason) {
+        this(meter, reason, null);
+    }
+
     public Meter meter() {
         return meter;
     }
@@ -44,6 +61,12 @@
         return this.reason;
     }
 
+    /**
+     * Returns the origin node.
+     * @return the node id of the origin node
+     * @deprecated in ONOS 2.2
+     */
+    @Deprecated
     public NodeId origin() {
         return this.origin;
     }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
index 5005787..2566803 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
@@ -17,7 +17,6 @@
 package org.onosproject.store.meter.impl;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -25,11 +24,7 @@
 import org.onlab.packet.IpAddress;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.TestApplicationId;
-import org.onosproject.cluster.ClusterServiceAdapter;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.MeterQuery;
 import org.onosproject.net.driver.Behaviour;
@@ -52,7 +47,6 @@
 
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
@@ -127,8 +121,6 @@
         meterStore = new DistributedMeterStore();
         // Let's initialize some internal services
         TestUtils.setField(meterStore, "storageService", new TestStorageService());
-        TestUtils.setField(meterStore, "clusterService", new TestClusterService());
-        TestUtils.setField(meterStore, "mastershipService", new TestMastershipService());
         TestUtils.setField(meterStore, "driverService", new TestDriverService());
 
         // Inject TestApplicationId into the DistributedMeterStore serializer
@@ -408,29 +400,19 @@
         assertNull(meterStore.getMeter(keyOne));
     }
 
-    // Test cluster service
-    private final class TestClusterService extends ClusterServiceAdapter {
-
-        private ControllerNode local = new DefaultControllerNode(NID_LOCAL, LOCALHOST);
-
-        @Override
-        public ControllerNode getLocalNode() {
-            return local;
-        }
-
-        @Override
-        public Set<ControllerNode> getNodes() {
-            return Sets.newHashSet();
-        }
-
-    }
-
-    // Test mastership service
-    private final class TestMastershipService extends MastershipServiceAdapter {
-        @Override
-        public NodeId getMasterFor(DeviceId deviceId) {
-            return NID_LOCAL;
-        }
+    /**
+     * Test purge meter.
+     */
+    @Test
+    public void testPurgeMeter() {
+        // add the meter
+        testStoreMeter();
+        meterStore.purgeMeter(did1);
+        // Verify delete
+        MeterKey keyOne = MeterKey.key(did1, mid1);
+        assertThat(0, is(meterStore.getAllMeters().size()));
+        assertThat(0, is(meterStore.getAllMeters(did1).size()));
+        assertNull(meterStore.getMeter(keyOne));
     }
 
     // Test class for driver service.