[AETHER-432][VOL-3263] Revisit meters subsystem to be fully distributed.

- Events are now propagated across the cluster.
- REF_COUNT_ZERO is now generated after the first stats interval.
- SB ops are offloaded to the installers.
- Installers use predictable executors.
- Deprecated MeterData API that exposes origin node and removed unused code.
- Comments are added to parts of the code that are obscure.
- MeterManager and DistributedMeterStore unit tests are improved.
- Fix an issue in TestConsistentMap.

Change-Id: I0329f903e5fdc421f29ee33f8f8133f18c794d8f
(cherry picked from commit 89c130115df20c80de446aac9c48e838468028d4)
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 1979c61..49a889d 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -15,9 +15,13 @@
  */
 package org.onosproject.net.meter.impl;
 
+import org.onlab.util.PredictableExecutor;
+import org.onlab.util.PredictableExecutor.PickyRunnable;
 import org.onlab.util.Tools;
 import org.onlab.util.TriConsumer;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.config.NetworkConfigRegistry;
@@ -59,12 +63,12 @@
 import java.util.Collection;
 import java.util.Dictionary;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.PredictableExecutor.newPredictableExecutor;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.OsgiPropertyConstants.MM_FALLBACK_METER_POLL_FREQUENCY;
@@ -93,10 +97,11 @@
 public class MeterManager
         extends AbstractListenerProviderRegistry<MeterEvent, MeterListener, MeterProvider, MeterProviderService>
         implements MeterService, MeterProviderRegistry {
-
+    // Installer related objects
+    private PredictableExecutor meterInstallers;
     private static final String WORKER_PATTERN = "installer-%d";
     private static final String GROUP_THREAD_NAME = "onos/meter";
-
+    // Logging facility, meter store delegate and listener for device events.
     private final Logger log = getLogger(getClass());
     private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
     private final DeviceListener deviceListener = new InternalDeviceListener();
@@ -119,7 +124,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected NetworkConfigRegistry netCfgService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
     /** Number of worker threads. */
+    // TODO Set 0 to use the available processors
     private int numThreads = MM_NUM_THREADS_DEFAULT;
 
     /** Frequency (in seconds) for polling meters via fallback provider. */
@@ -128,19 +137,28 @@
     /** Purge entries associated with a device when the device goes offline. */
     private boolean purgeOnDisconnection = MM_PURGE_ON_DISCONNECTION_DEFAULT;
 
+    // Action triggered when the futures related to submit and withdrawal complete
     private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
 
-    private ExecutorService executorService;
-
+    // Meter provider reference
     private final MeterDriverProvider defaultProvider = new MeterDriverProvider();
 
+    // Node id used to verify who is charge of the meter ops
+    // (usually one node can modify the internal state of the device)
+    private NodeId local;
+
     @Activate
     public void activate(ComponentContext context) {
         store.setDelegate(delegate);
         cfgService.registerProperties(getClass());
         eventDispatcher.addSink(MeterEvent.class, listenerRegistry);
         deviceService.addListener(deviceListener);
-
+        local = clusterService.getLocalNode().id();
+        // Consumer logic is the following:
+        // if there is an exceptional end (storage exception), on error is called
+        // else if there is a reason for the failure, on error is called with the reason
+        // else if the reason is empty, on success is called
+        // In all the cases the meter context code is consumed
         onComplete = (request, result, error) -> {
                 request.context().ifPresent(c -> {
                     if (error != null) {
@@ -153,12 +171,9 @@
                         }
                     }
                 });
-
         };
 
         modified(context);
-        executorService = newFixedThreadPool(numThreads,
-                                             groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
         log.info("Started");
     }
 
@@ -178,7 +193,7 @@
         eventDispatcher.removeSink(MeterEvent.class);
         deviceService.removeListener(deviceListener);
         cfgService.unregisterProperties(getClass(), false);
-        executorService.shutdown();
+        meterInstallers.shutdown();
         log.info("Stopped");
     }
 
@@ -215,6 +230,11 @@
         } catch (NumberFormatException e) {
             numThreads = MM_NUM_THREADS_DEFAULT;
         }
+        if (meterInstallers != null) {
+            meterInstallers.shutdown();
+        }
+        meterInstallers = newPredictableExecutor(numThreads,
+                groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
     }
 
     @Override
@@ -230,19 +250,20 @@
     @Override
     public Meter submit(MeterRequest request) {
         checkNotNull(request, "request cannot be null.");
+        // Allocate an id and then submit the request
         MeterId id = allocateMeterId(request.deviceId());
-
         Meter.Builder mBuilder = DefaultMeter.builder()
                 .forDevice(request.deviceId())
                 .fromApp(request.appId())
                 .withBands(request.bands())
                 .withCellId(id)
                 .withUnit(request.unit());
-
         if (request.isBurst()) {
             mBuilder.burst();
         }
         DefaultMeter m = (DefaultMeter) mBuilder.build();
+        // Meter installation logic (happy ending case)
+        // PENDING -> stats -> ADDED -> future completes
         m.setState(MeterState.PENDING_ADD);
         store.storeMeter(m).whenComplete((result, error) ->
                                                  onComplete.accept(request, result, error));
@@ -262,8 +283,10 @@
         if (request.isBurst()) {
             mBuilder.burst();
         }
-
         DefaultMeter m = (DefaultMeter) mBuilder.build();
+        // Meter removal logic (happy ending case)
+        // PENDING -> stats -> removed from the map -> future completes
+        // There is no transition to the REMOVED state
         m.setState(MeterState.PENDING_REMOVE);
         store.deleteMeter(m).whenComplete((result, error) ->
                                                   onComplete.accept(request, result, error));
@@ -288,19 +311,19 @@
 
     @Override
     public MeterId allocateMeterId(DeviceId deviceId) {
-        // We delegate direclty to the store
+        // We delegate directly to the store
         return store.allocateMeterId(deviceId);
     }
 
     @Override
     public void freeMeterId(DeviceId deviceId, MeterId meterId) {
-        // We delegate direclty to the store
+        // We delegate directly to the store
         store.freeMeterId(deviceId, meterId);
     }
 
     @Override
     public void purgeMeters(DeviceId deviceId) {
-        // We delegate direclty to the store
+        // We delegate directly to the store
         store.purgeMeter(deviceId);
     }
 
@@ -325,6 +348,7 @@
 
         @Override
         public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
+            // Each update on the store is reflected on this collection
             Collection<Meter> allMeters = store.getAllMeters(deviceId);
 
             Map<MeterId, Meter> meterEntriesMap = meterEntries.stream()
@@ -337,8 +361,8 @@
                         !meterEntriesMap.containsKey(m.id())) {
                     // The meter is missing in the device. Reinstall!
                     log.debug("Adding meter missing in device {} {}", deviceId, m);
-                    provider().performMeterOperation(deviceId,
-                                                     new MeterOperation(m, MeterOperation.Type.ADD));
+                    // offload the task to avoid the overloading of the sb threads
+                    meterInstallers.execute(new MeterInstaller(deviceId, m, MeterOperation.Type.ADD));
                 }
             });
 
@@ -347,7 +371,7 @@
                     .filter(md -> !allMeters.stream().anyMatch(m -> m.id().equals(md.getKey())))
                     .forEach(mio -> {
                         Meter meter = mio.getValue();
-                        // FIXME: Removing a meter is meaningfull for OpenFlow, but not for P4Runtime.
+                        // FIXME: Removing a meter is meaningful for OpenFlow, but not for P4Runtime.
                         // In P4Runtime meter cells cannot be removed. For the
                         // moment, we make the distinction between OpenFlow and
                         // P4Runtime by looking at the MeterCellType (always
@@ -355,21 +379,29 @@
                         if (meter.meterCellId().type() == MeterCellType.INDEX) {
                             // The meter is missing in onos. Uninstall!
                             log.debug("Remove meter in device not in onos {} {}", deviceId, mio.getKey());
-                            provider().performMeterOperation(deviceId,
-                                                             new MeterOperation(meter, MeterOperation.Type.REMOVE));
+                            // offload the task to avoid the overloading of the sb threads
+                            meterInstallers.execute(new MeterInstaller(deviceId, meter, MeterOperation.Type.REMOVE));
                         }
                     });
 
+            // Update the meter stats in the store (first time move the state from pending to added)
             meterEntries.stream()
                     .filter(m -> allMeters.stream()
                             .anyMatch(sm -> sm.deviceId().equals(deviceId) && sm.id().equals(m.id())))
                     .forEach(m -> store.updateMeterState(m));
 
             allMeters.forEach(m -> {
+                // FIXME: Installing a meter is meaningful for OpenFlow, but not for P4Runtime.
+                // It looks like this flow is used only for p4runtime to emulate the installation
+                // since meters are already instantiated - we need just modify the params.
                 if (m.state() == MeterState.PENDING_ADD) {
-                    provider().performMeterOperation(m.deviceId(),
-                                                     new MeterOperation(m,
-                                                                        MeterOperation.Type.MODIFY));
+                    // offload the task to avoid the overloading of the sb threads
+                    meterInstallers.execute(new MeterInstaller(m.deviceId(), m, MeterOperation.Type.MODIFY));
+                // Remove workflow. Regarding OpenFlow, meters have been removed from
+                // the device but they are still in the store, we will purge them definitely.
+                // Instead, P4Runtime devices will not remove the meter. The first workaround
+                // for P4Runtime will avoid to send a remove op. Then, we reach this point
+                // and we purge the meter from the store
                 } else if (m.state() == MeterState.PENDING_REMOVE) {
                     store.deleteMeterNow(m);
                 }
@@ -393,25 +425,25 @@
         public void notify(MeterEvent event) {
             DeviceId deviceId = event.subject().deviceId();
             switch (event.type()) {
+                // REQ events will trigger a modification in the device.
+                // Mastership check is performed inside the installer
+                // to avoid the blocking of the RAFT threads
                 case METER_ADD_REQ:
-                    executorService.execute(new MeterInstaller(deviceId, event.subject(),
+                    meterInstallers.execute(new MeterInstaller(deviceId, event.subject(),
                                                                MeterOperation.Type.ADD));
                     break;
                 case METER_REM_REQ:
-                    executorService.execute(new MeterInstaller(deviceId, event.subject(),
+                    meterInstallers.execute(new MeterInstaller(deviceId, event.subject(),
                                                                MeterOperation.Type.REMOVE));
                     break;
+                // Following events are forwarded to the apps subscribed for the meter events;
+                // installers are not involved in this task. In this case, the overhead for this op
+                // is almost null. Potentially we can introduce a store delegate thread.
                 case METER_ADDED:
-                    log.info("Meter added {}", event.subject());
-                    post(new MeterEvent(MeterEvent.Type.METER_ADDED, event.subject()));
-                    break;
                 case METER_REMOVED:
-                    log.info("Meter removed {}", event.subject());
-                    post(new MeterEvent(MeterEvent.Type.METER_REMOVED, event.subject()));
-                    break;
                 case METER_REFERENCE_COUNT_ZERO:
-                    log.debug("Meter reference count zero {}", event.subject());
-                    post(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO, event.subject()));
+                    log.debug("Post {} event {}", event.type(), event.subject());
+                    post(event);
                     break;
                 default:
                     log.warn("Unknown meter event {}", event.type());
@@ -419,10 +451,11 @@
 
         }
     }
+
     /**
      * Task that passes the meter down to the provider.
      */
-    private class MeterInstaller implements Runnable {
+    private class MeterInstaller implements PickyRunnable {
         private final DeviceId deviceId;
         private final Meter meter;
         private final MeterOperation.Type op;
@@ -435,6 +468,14 @@
 
         @Override
         public void run() {
+            // Check mastership and eventually execute the op on the device
+            log.debug("Meter {} request {}", op.name().toLowerCase(), meter);
+            NodeId master = mastershipService.getMasterFor(meter.deviceId());
+            if (!Objects.equals(local, master)) {
+                log.trace("Not the master of device {}, skipping installation of the meter {}",
+                        meter.deviceId(), meter.id());
+                return;
+            }
             MeterProvider p = getProvider(this.deviceId);
             if (p == null) {
                 log.error("Unable to recover {}'s provider", deviceId);
@@ -442,6 +483,11 @@
             }
             p.performMeterOperation(deviceId, new MeterOperation(meter, op));
         }
+
+        @Override
+        public int hint() {
+            return meter.id().hashCode();
+        }
     }
 
     private class InternalDeviceListener implements DeviceListener {
diff --git a/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java b/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
index 030b9e6..52970dd 100644
--- a/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -73,6 +74,7 @@
 import org.onosproject.store.meter.impl.DistributedMeterStore;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.TestStorageService;
+import org.osgi.service.component.ComponentContext;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -80,8 +82,12 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.concurrent.CompletableFuture;
 
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -115,7 +121,6 @@
             new DefaultDevice(PROGRAMMABLE_PROVIDER, PROGRAMMABLE_DID, Device.Type.SWITCH,
                     "", "", "", "", null, ANNOTATIONS);
 
-
     private MeterService service;
 
     // Test Driver service used during the tests
@@ -232,8 +237,6 @@
         meterStore = new DistributedMeterStore();
         // Let's initialize some internal services of the store
         TestUtils.setField(meterStore, "storageService", new TestStorageService());
-        TestUtils.setField(meterStore, "clusterService", new TestClusterService());
-        TestUtils.setField(meterStore, "mastershipService", new TestMastershipService());
         TestUtils.setField(meterStore, "driverService", driverService);
 
         // Inject TestApplicationId into the DistributedMeterStore serializer
@@ -252,16 +255,20 @@
         manager.deviceService = deviceService;
         manager.mastershipService = new TestMastershipService();
         manager.cfgService = new ComponentConfigAdapter();
-        TestUtils.setField(manager, "storageService", new TestStorageService());
+        manager.clusterService = new TestClusterService();
         // Init the reference of the registry
         registry = manager;
 
         manager.driverService = driverService;
 
         // Activate the manager
-        manager.activate(null);
-        // Initialize the test provider
+        Dictionary<String, Object> cfgDict = new Hashtable<>();
+        ComponentContext componentContext = EasyMock.createMock(ComponentContext.class);
+        expect(componentContext.getProperties()).andReturn(cfgDict);
+        replay(componentContext);
+        manager.activate(componentContext);
 
+        // Initialize the test provider
         provider = new TestProvider(PID);
         // Register the provider against the manager
         providerService = registry.register(provider);
@@ -315,8 +322,8 @@
         // Submit meter request
         manager.submit(m1Request.add());
         // Verify add
-        assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
-        assertTrue("The meter was not added", manager.getMeters(did1).size() == 1);
+        assertEquals("The meter was not added", 1, manager.getAllMeters().size());
+        assertEquals("The meter was not added", 1, manager.getMeters(did1).size());
         // Get Meter
         Meter installingMeter = manager.getMeter(did1, mid1);
         // Verify add of installingMeter and pending add state
@@ -329,8 +336,8 @@
         Meter installedMeter = manager.getMeter(did1, mid1);
         // Verify installation
         assertThat(installedMeter.state(), is(MeterState.ADDED));
-        assertTrue("The meter was not installed", manager.getAllMeters().size() == 1);
-        assertTrue("The meter was not installed", manager.getMeters(did1).size() == 1);
+        assertEquals("The meter was not installed", 1, manager.getAllMeters().size());
+        assertEquals("The meter was not installed", 1, manager.getMeters(did1).size());
     }
 
     /**
@@ -348,14 +355,14 @@
         Meter withdrawingMeter = manager.getMeter(did1, mid1);
         // Verify withdrawing
         assertThat(withdrawingMeter.state(), is(MeterState.PENDING_REMOVE));
-        assertTrue("The meter was not withdrawn", manager.getAllMeters().size() == 1);
-        assertTrue("The meter was not withdrawn", manager.getMeters(did1).size() == 1);
+        assertEquals("The meter was not withdrawn", 1, manager.getAllMeters().size());
+        assertEquals("The meter was not withdrawn", 1, manager.getMeters(did1).size());
         // Let's simulate a working data-plane
         pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter);
         // Verify withdrawn
         assertNull(manager.getMeter(did1, mid1));
-        assertTrue("The meter was not removed", manager.getAllMeters().size() == 0);
-        assertTrue("The meter was not removed", manager.getMeters(did1).size() == 0);
+        assertEquals("The meter was not removed", 0, manager.getAllMeters().size());
+        assertEquals("The meter was not removed", 0, manager.getMeters(did1).size());
     }
 
     /**
@@ -370,9 +377,9 @@
         // Submit meter 2
         manager.submit(m2Request.add());
         // Verify add
-        assertTrue("The meter was not added", manager.getAllMeters().size() == 2);
-        assertTrue("The meter was not added", manager.getMeters(did1).size() == 1);
-        assertTrue("The meter was not added", manager.getMeters(did2).size() == 1);
+        assertEquals("The meter was not added", 2, manager.getAllMeters().size());
+        assertEquals("The meter was not added", 1, manager.getMeters(did1).size());
+        assertEquals("The meter was not added", 1, manager.getMeters(did2).size());
         // Get Meters
         Meter installingMeter1 = manager.getMeter(did1, mid1);
         Meter installingMeter2 = manager.getMeter(did2, mid1);
@@ -391,9 +398,9 @@
         // Verify installation
         assertThat(installedMeter1.state(), is(MeterState.ADDED));
         assertThat(installedMeter2.state(), is(MeterState.ADDED));
-        assertTrue("The meter was not installed", manager.getAllMeters().size() == 2);
-        assertTrue("The meter was not installed", manager.getMeters(did1).size() == 1);
-        assertTrue("The meter was not installed", manager.getMeters(did2).size() == 1);
+        assertEquals("The meter was not installed", 2, manager.getAllMeters().size());
+        assertEquals("The meter was not installed", 1, manager.getMeters(did1).size());
+        assertEquals("The meter was not installed", 1, manager.getMeters(did2).size());
     }
 
     /**
@@ -417,73 +424,78 @@
         // Verify withdrawing
         assertThat(withdrawingMeter1.state(), is(MeterState.PENDING_REMOVE));
         assertThat(withdrawingMeter2.state(), is(MeterState.PENDING_REMOVE));
-        assertTrue("The meter was not withdrawn", manager.getAllMeters().size() == 2);
-        assertTrue("The meter was not withdrawn", manager.getMeters(did1).size() == 1);
-        assertTrue("The meter was not withdrawn", manager.getMeters(did2).size() == 1);
+        assertEquals("The meter was not withdrawn", 2, manager.getAllMeters().size());
+        assertEquals("The meter was not withdrawn", 1, manager.getMeters(did1).size());
+        assertEquals("The meter was not withdrawn", 1, manager.getMeters(did2).size());
         // Let's simulate a working data-plane
         pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter1);
         pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter2);
         // Verify withdrawn
         assertNull(manager.getMeter(did1, mid1));
         assertNull(manager.getMeter(did2, mid1));
-        assertTrue("The meter was not removed", manager.getAllMeters().size() == 0);
-        assertTrue("The meter was not removed", manager.getMeters(did1).size() == 0);
-        assertTrue("The meter was not removed", manager.getMeters(did2).size() == 0);
+        assertEquals("The meter was not removed", 0, manager.getAllMeters().size());
+        assertEquals("The meter was not removed", 0, manager.getMeters(did1).size());
+        assertEquals("The meter was not removed", 0, manager.getMeters(did2).size());
+    }
+
+    /**
+     * Test purge meter.
+     */
+    @Test
+    public void testPurge() {
+        // Init store
+        initMeterStore();
+        // Submit meter request
+        manager.submit(m1Request.add());
+        // Verify submit
+        Meter submittingMeter = manager.getMeter(did1, mid1);
+        assertThat(submittingMeter.state(), is(MeterState.PENDING_ADD));
+        assertEquals("The meter was not added", 1, manager.getAllMeters().size());
+        assertEquals("The meter was not added", 1, manager.getMeters(did1).size());
+        // Purge the meters
+        manager.purgeMeters(did1);
+        // Verify purge
+        assertNull(manager.getMeter(did1, mid1));
+        assertEquals("The meter was not purged", 0, manager.getAllMeters().size());
+        assertEquals("The meter was not purged", 0, manager.getMeters(did1).size());
     }
 
     @Test
     public void testAddFromMeterProgrammable()  {
-
         // Init store
         initMeterStore();
-
         manager.submit(mProgrammableRequest.add());
-
         TestTools.assertAfter(500, () -> {
-
-            assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
-
+            assertEquals("The meter was not added", 1, manager.getAllMeters().size());
             assertThat(manager.getMeter(PROGRAMMABLE_DID, MeterId.meterId(1)), is(mProgrammable));
         });
-
     }
 
     @Test
     public void testAddBatchFromMeterProgrammable()  {
-
         // Init store
         initMeterStore();
-
         List<MeterOperation> operations = ImmutableList.of(new MeterOperation(mProgrammable, MeterOperation.Type.ADD));
         manager.defaultProvider().performMeterOperation(PROGRAMMABLE_DID, new MeterOperations(operations));
-
         TestTools.assertAfter(500, () -> {
-
-            assertTrue("The meter was not added", meterOperations.size() == 1);
-
-            assertTrue("Wrong Meter Operation", meterOperations.get(0).meter().id().equals(mProgrammable.id()));
+            assertEquals("The meter was not added", 1, meterOperations.size());
+            assertEquals("Wrong Meter Operation", meterOperations.get(0).meter().id(), mProgrammable.id());
         });
 
     }
 
     @Test
     public void testGetFromMeterProgrammable()  {
-
         // Init store
         initMeterStore();
-
         MeterDriverProvider fallback = (MeterDriverProvider) manager.defaultProvider();
-
         testAddFromMeterProgrammable();
-
         fallback.init(manager.deviceService, fallback.meterProviderService, manager.mastershipService, 1);
-
         TestTools.assertAfter(2000, () -> {
-            assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
+            assertEquals("The meter was not added", 1, manager.getAllMeters().size());
             Meter m = manager.getMeters(PROGRAMMABLE_DID).iterator().next();
             assertEquals("incorrect state", MeterState.ADDED, m.state());
         });
-
     }
 
     // Test cluster service