[ONOS-7114] Meter Subsystem Refactoring
Changes:
- Moves meter counters in the store
- Uses atomic counter map for meter counters
- Implements atomic counter map adapter and test atomic counter map
- Introduces free meter id
- Changes allocate meter id
- Implements unit tests for MeterManager and MeterStore
Change-Id: I45e3debc0e43ca7edcf6e3b4065866634f76f9f7
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
index 3d007071..9b5eae8 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
@@ -42,7 +42,6 @@
*/
CompletableFuture<MeterStoreResult> deleteMeter(Meter meter);
-
/**
* Adds the meter features to the store.
*
@@ -59,7 +58,6 @@
*/
MeterStoreResult deleteMeterFeatures(DeviceId deviceId);
-
/**
* Updates a meter whose meter id is the same as the passed meter.
*
@@ -124,13 +122,20 @@
long getMaxMeters(MeterFeaturesKey key);
/**
- * Returns the first available MeterId from previously removed meter.
- * This method allows allocating MeterIds below the actual meterIdCounter
- * value.
+ * Allocates the first available MeterId.
*
* @param deviceId the device id
- * @return the meter Id or null if none exist
+ * @return the meter Id or null if it was not possible
+ * to allocate a meter id
*/
- MeterId firstReusableMeterId(DeviceId deviceId);
+ MeterId allocateMeterId(DeviceId deviceId);
+
+ /**
+ * Frees the given meter id.
+ *
+ * @param deviceId the device id
+ * @param meterId the id to be freed
+ */
+ void freeMeterId(DeviceId deviceId, MeterId meterId);
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AtomicCounterMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AtomicCounterMapAdapter.java
new file mode 100644
index 0000000..df7d270
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/AtomicCounterMapAdapter.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+/**
+ * Testing adapter for the atomic counter map.
+ */
+public class AtomicCounterMapAdapter<K> implements AtomicCounterMap<K> {
+
+ @Override
+ public long incrementAndGet(K key) {
+ return 0;
+ }
+
+ @Override
+ public long decrementAndGet(K key) {
+ return 0;
+ }
+
+ @Override
+ public long getAndIncrement(K key) {
+ return 0;
+ }
+
+ @Override
+ public long getAndDecrement(K key) {
+ return 0;
+ }
+
+ @Override
+ public long addAndGet(K key, long delta) {
+ return 0;
+ }
+
+ @Override
+ public long getAndAdd(K key, long delta) {
+ return 0;
+ }
+
+ @Override
+ public long get(K key) {
+ return 0;
+ }
+
+ @Override
+ public long put(K key, long newValue) {
+ return 0;
+ }
+
+ @Override
+ public long putIfAbsent(K key, long newValue) {
+ return 0;
+ }
+
+ @Override
+ public boolean replace(K key, long expectedOldValue, long newValue) {
+ return false;
+ }
+
+ @Override
+ public long remove(K key) {
+ return 0;
+ }
+
+ @Override
+ public boolean remove(K key, long value) {
+ return false;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return false;
+ }
+
+ @Override
+ public void clear() {
+
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounterMap.java b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounterMap.java
new file mode 100644
index 0000000..ef1769c
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounterMap.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import com.google.common.util.concurrent.AtomicLongMap;
+
+/**
+ * Test implementation of the atomic counter map.
+ */
+public final class TestAtomicCounterMap<K> extends AtomicCounterMapAdapter<K> {
+
+ // Map name
+ private final String atomicCounterMapName;
+ // Atomic long map from guava
+ private AtomicLongMap<K> map;
+
+ private TestAtomicCounterMap(String name) {
+ // Init name, map using create
+ atomicCounterMapName = name;
+ map = AtomicLongMap.create();
+ }
+
+ @Override
+ public long incrementAndGet(K key) {
+ // Forward directly to the guava map
+ return map.incrementAndGet(key);
+ }
+
+ @Override
+ public long decrementAndGet(K key) {
+ // Forward directly to the guava map
+ return map.decrementAndGet(key);
+ }
+
+ @Override
+ public long getAndIncrement(K key) {
+ // Forward directly to the guava map
+ return map.getAndIncrement(key);
+ }
+
+ @Override
+ public long getAndDecrement(K key) {
+ // Forward directly to the guava map
+ return map.getAndDecrement(key);
+ }
+
+ @Override
+ public long addAndGet(K key, long delta) {
+ // Forward directly to the guava map
+ return map.addAndGet(key, delta);
+ }
+
+ @Override
+ public long getAndAdd(K key, long delta) {
+ // Forward directly to the guava map
+ return map.getAndAdd(key, delta);
+ }
+
+ @Override
+ public long get(K key) {
+ // Forward directly to the guava map
+ return map.get(key);
+ }
+
+ @Override
+ public long put(K key, long newValue) {
+ // Forward directly to the guava map
+ return map.put(key, newValue);
+ }
+
+ @Override
+ // Coarse implementation, should we take the lock ?
+ public long putIfAbsent(K key, long newValue) {
+ // If it does not exist
+ if (!map.containsKey(key)) {
+ // Just do a put
+ return map.put(key, newValue);
+ } else {
+ // Let's return the existing value
+ return map.get(key);
+ }
+ }
+
+ @Override
+ // Coarse implementation, should we take the lock ?
+ public boolean replace(K key, long expectedOldValue, long newValue) {
+ // If the value exists and it the expected one
+ if (map.containsKey(key) && map.get(key) == expectedOldValue) {
+ // Let's put the value
+ map.put(key, newValue);
+ // Done, return true
+ return true;
+
+ } else if (!map.containsKey(key) && expectedOldValue == 0) {
+ // If the value does not exist, and old value is 0
+ map.put(key, newValue);
+ // Done, return true
+ return true;
+ } else {
+ // replace is not possible, just return false
+ return false;
+ }
+ }
+
+ @Override
+ public long remove(K key) {
+ // Forward directly to the guava map
+ return map.remove(key);
+ }
+
+ @Override
+ // Coarse implementation, should we take the lock ?
+ public boolean remove(K key, long value) {
+ // If the value exists and it is equal to value
+ if (map.containsKey(key) && map.get(key) == value) {
+ // Let's remove the value
+ map.remove(key);
+ // Done, return true
+ return true;
+ } else {
+ // remove is not possible, just return false
+ return false;
+ }
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public void clear() {
+ map.clear();
+ }
+
+ @Override
+ public String name() {
+ return atomicCounterMapName;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder<K> extends AtomicCounterMapBuilder<K> {
+
+ @Override
+ public AtomicCounterMap<K> build() {
+ return new TestAtomicCounterMap<>(name());
+ }
+
+ @Override
+ public AsyncAtomicCounterMap<K> buildAsyncMap() {
+ return null;
+ }
+ }
+
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java b/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java
index ab5f2ee..e41d2ad 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestStorageService.java
@@ -62,4 +62,9 @@
public <T> Topic<T> getTopic(String name, Serializer serializer) {
return new TestTopic(name);
}
+
+ @Override
+ public <K> AtomicCounterMapBuilder<K> atomicCounterMapBuilder() {
+ return TestAtomicCounterMap.builder();
+ }
}
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 1173f1d..0d7ab42 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.net.meter.impl;
-import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -25,15 +24,11 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.TriConsumer;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.behaviour.MeterQuery;
-import org.onosproject.net.driver.DriverHandler;
-import org.onosproject.net.driver.DriverService;
import org.onosproject.net.meter.DefaultMeter;
import org.onosproject.net.meter.Meter;
import org.onosproject.net.meter.MeterEvent;
import org.onosproject.net.meter.MeterFailReason;
import org.onosproject.net.meter.MeterFeatures;
-import org.onosproject.net.meter.MeterFeaturesKey;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterKey;
import org.onosproject.net.meter.MeterListener;
@@ -49,8 +44,6 @@
import org.onosproject.net.meter.MeterStoreResult;
import org.onosproject.net.provider.AbstractListenerProviderRegistry;
import org.onosproject.net.provider.AbstractProviderService;
-import org.onosproject.store.service.AtomicCounter;
-import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.util.Collection;
@@ -72,7 +65,6 @@
extends AbstractListenerProviderRegistry<MeterEvent, MeterListener, MeterProvider, MeterProviderService>
implements MeterService, MeterProviderRegistry {
- private static final String METERCOUNTERIDENTIFIER = "meter-id-counter-%s";
private static final String NUM_THREAD = "numThreads";
private static final String WORKER_PATTERN = "installer-%d";
private static final String GROUP_THREAD_NAME = "onos/meter";
@@ -87,16 +79,7 @@
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MeterStore store;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DriverService driverService;
-
- private Map<DeviceId, AtomicCounter> meterIdCounters
- = Maps.newConcurrentMap();
+ private MeterStore store;
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
@@ -198,54 +181,9 @@
return store.getAllMeters();
}
- private long queryMeters(DeviceId device) {
- DriverHandler handler = driverService.createHandler(device);
- if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
- return 0L;
- }
- MeterQuery query = handler.behaviour(MeterQuery.class);
- return query.getMaxMeters();
- }
-
private MeterId allocateMeterId(DeviceId deviceId) {
- // We first query the store for any previously removed meterId that could
- // be reused. Receiving a value (not null) already means that meters
- // are available for the device.
- MeterId meterid = store.firstReusableMeterId(deviceId);
- if (meterid != null) {
- return meterid;
- }
- // If there was no reusable MeterId we have to generate a new value
- // with an upper limit in maxMeters.
- long maxMeters = store.getMaxMeters(MeterFeaturesKey.key(deviceId));
- if (maxMeters == 0L) {
- // MeterFeatures couldn't be retrieved, trying with queryMeters.
- // queryMeters is implemented in FullMetersAvailable behaviour.
- maxMeters = queryMeters(deviceId);
- }
-
- if (maxMeters == 0L) {
- throw new IllegalStateException("Meters not supported by device " + deviceId);
- }
-
- final long mmeters = maxMeters;
- long id = meterIdCounters.compute(deviceId, (k, v) -> {
- if (v == null) {
- return allocateCounter(k);
- }
- if (v.get() >= mmeters) {
- throw new IllegalStateException("Maximum number of meters " +
- meterIdCounters.get(deviceId).get() +
- " reached for device " + deviceId);
- }
- return v;
- }).incrementAndGet();
-
- return MeterId.meterId(id);
- }
-
- private AtomicCounter allocateCounter(DeviceId deviceId) {
- return storageService.getAtomicCounter(String.format(METERCOUNTERIDENTIFIER, deviceId));
+ // We delegate direclty to the store
+ return store.allocateMeterId(deviceId);
}
private class InternalMeterProviderService
diff --git a/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java b/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
index cc19cd0..51d3384 100644
--- a/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.net.meter.impl;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
@@ -27,8 +26,6 @@
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.common.event.impl.TestEventDispatcher;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.DefaultApplicationId;
import org.onosproject.incubator.store.meter.impl.DistributedMeterStore;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.DeviceId;
@@ -38,7 +35,7 @@
import org.onosproject.net.meter.DefaultMeterFeatures;
import org.onosproject.net.meter.DefaultMeterRequest;
import org.onosproject.net.meter.Meter;
-import org.onosproject.net.meter.MeterFeaturesKey;
+import org.onosproject.net.meter.MeterFeatures;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterOperation;
import org.onosproject.net.meter.MeterOperations;
@@ -46,16 +43,13 @@
import org.onosproject.net.meter.MeterProviderRegistry;
import org.onosproject.net.meter.MeterProviderService;
import org.onosproject.net.meter.MeterRequest;
-import org.onosproject.net.meter.MeterService;
import org.onosproject.net.meter.MeterState;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.service.TestStorageService;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import static org.hamcrest.Matchers.is;
@@ -69,194 +63,295 @@
*/
public class MeterManagerTest {
- private static final ProviderId PID = new ProviderId("of", "foo");
+ // Test node id
private static final NodeId NID_LOCAL = new NodeId("local");
+
+ // Test ip address
private static final IpAddress LOCALHOST = IpAddress.valueOf("127.0.0.1");
- private MeterService service;
- private MeterManager manager;
- private DistributedMeterStore meterStore;
- private MeterProviderRegistry registry;
- private MeterProviderService providerService;
+ // Provider id used during the tests
+ private static final ProviderId PID = new ProviderId("of", "foo");
+ // Test provider used during the tests
private TestProvider provider;
- private ApplicationId appId;
+ // Meter manager
+ private MeterManager manager;
+ // Meter provider registry
+ private MeterProviderRegistry registry;
- private Meter m1;
- private Meter m2;
- private MeterRequest.Builder m1Request;
- private MeterRequest.Builder m2Request;
- private MeterRequest.Builder m3Request;
+ // Meter provider service
+ private MeterProviderService providerService;
- private Map<MeterId, Meter> meters = Maps.newHashMap();
+ // Store under testing
+ private DistributedMeterStore meterStore;
+
+ // Device ids used during the tests
+ private DeviceId did1 = did("1");
+ private DeviceId did2 = did("2");
+
+ // Meter ids used during the tests
+ private MeterId mid1 = MeterId.meterId(1);
+
+ // Bands used during the tests
+ private Band b1 = DefaultBand.builder()
+ .ofType(Band.Type.DROP)
+ .withRate(500)
+ .build();
+
+ // Meters used during the tests
+ private Meter m1 = DefaultMeter.builder()
+ .forDevice(did1)
+ .fromApp(APP_ID)
+ .withId(mid1)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ private Meter m2 = DefaultMeter.builder()
+ .forDevice(did2)
+ .fromApp(APP_ID)
+ .withId(mid1)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+
+ // Meter requests used during the tests
+ private MeterRequest.Builder m1Request = DefaultMeterRequest.builder()
+ .forDevice(did1)
+ .fromApp(APP_ID)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1));
+ private MeterRequest.Builder m2Request = DefaultMeterRequest.builder()
+ .forDevice(did2)
+ .fromApp(APP_ID)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1));
+
+ // Meter features used during the tests
+ private MeterFeatures mef1 = DefaultMeterFeatures.builder().forDevice(did1)
+ .withMaxMeters(3L)
+ .withBandTypes(new HashSet<>())
+ .withUnits(new HashSet<>())
+ .hasStats(false)
+ .hasBurst(false)
+ .withMaxBands((byte) 0)
+ .withMaxColors((byte) 0)
+ .build();
+ private MeterFeatures mef2 = DefaultMeterFeatures.builder().forDevice(did2)
+ .withMaxMeters(10L)
+ .withBandTypes(new HashSet<>())
+ .withUnits(new HashSet<>())
+ .hasStats(false)
+ .hasBurst(false)
+ .withMaxBands((byte) 0)
+ .withMaxColors((byte) 0)
+ .build();
@Before
- public void setup() throws Exception {
+ public void setup() {
+ // Init step for the store
meterStore = new DistributedMeterStore();
+ // Let's initialize some internal services of the store
TestUtils.setField(meterStore, "storageService", new TestStorageService());
TestUtils.setField(meterStore, "clusterService", new TestClusterService());
TestUtils.setField(meterStore, "mastershipService", new TestMastershipService());
+ // Activate the store
meterStore.activate();
- meterStore.storeMeterFeatures(DefaultMeterFeatures.builder().forDevice(did("1"))
- .withMaxMeters(255L)
- .withBandTypes(new HashSet<>())
- .withUnits(new HashSet<>())
- .hasStats(false)
- .hasBurst(false)
- .withMaxBands((byte) 0)
- .withMaxColors((byte) 0)
- .build());
- meterStore.storeMeterFeatures(DefaultMeterFeatures.builder().forDevice(did("2"))
- .withMaxMeters(2)
- .withBandTypes(new HashSet<>())
- .withUnits(new HashSet<>())
- .hasBurst(false)
- .hasStats(false)
- .withMaxBands((byte) 0)
- .withMaxColors((byte) 0)
- .build());
-
+ // Init step for the manager
manager = new MeterManager();
- manager.store = meterStore;
- TestUtils.setField(manager, "storageService", new TestStorageService());
+ // Let's initialize some internal services of the manager
+ TestUtils.setField(manager, "store", meterStore);
injectEventDispatcher(manager, new TestEventDispatcher());
- service = manager;
+ // Init the reference of the registry
registry = manager;
-
+ // Activate the manager
manager.activate();
-
+ // Initialize the test provider
provider = new TestProvider(PID);
+ // Register the provider against the manager
providerService = registry.register(provider);
-
- appId = new TestApplicationId(0, "MeterManagerTest");
-
+ // Verify register
assertTrue("provider should be registered",
registry.getProviders().contains(provider.id()));
-
- Band band = DefaultBand.builder()
- .ofType(Band.Type.DROP)
- .withRate(500)
- .build();
-
- m1 = DefaultMeter.builder()
- .forDevice(did("1"))
- .fromApp(APP_ID)
- .withId(MeterId.meterId(1))
- .withUnit(Meter.Unit.KB_PER_SEC)
- .withBands(Collections.singletonList(band))
- .build();
-
- m2 = DefaultMeter.builder()
- .forDevice(did("2"))
- .fromApp(APP_ID)
- .withId(MeterId.meterId(1))
- .withUnit(Meter.Unit.KB_PER_SEC)
- .withBands(Collections.singletonList(band))
- .build();
-
- m1Request = DefaultMeterRequest.builder()
- .forDevice(did("1"))
- .fromApp(APP_ID)
- .withUnit(Meter.Unit.KB_PER_SEC)
- .withBands(Collections.singletonList(band));
-
- m2Request = DefaultMeterRequest.builder()
- .forDevice(did("2"))
- .fromApp(APP_ID)
- .withUnit(Meter.Unit.KB_PER_SEC)
- .withBands(Collections.singletonList(band));
-
- m3Request = DefaultMeterRequest.builder()
- .forDevice(did("1"))
- .fromApp(APP_ID)
- .withUnit(Meter.Unit.KB_PER_SEC)
- .withBands(Collections.singletonList(band));
-
}
@After
public void tearDown() {
+ // Unregister provider
registry.unregister(provider);
+ // Verify unregister
assertFalse("provider should not be registered",
registry.getProviders().contains(provider.id()));
-
+ // Deactivate manager
manager.deactivate();
+ // Remove event dispatcher
injectEventDispatcher(manager, null);
-
+ // Deactivate store
+ meterStore.deactivate();
}
+ private void initMeterStore() {
+ // Let's store feature for device 1
+ meterStore.storeMeterFeatures(mef1);
+ // Let's store feature for device 2
+ meterStore.storeMeterFeatures(mef2);
+ }
+
+ // Emulate metrics coming from the dataplane
+ private void pushMetrics(MeterOperation.Type type, Meter meter) {
+ // If it is an add operation
+ if (type == MeterOperation.Type.ADD) {
+ // Update state to added
+ ((DefaultMeter) meter).setState(MeterState.ADDED);
+ // Push the update in the store
+ providerService.pushMeterMetrics(meter.deviceId(), Collections.singletonList(meter));
+ } else {
+ providerService.pushMeterMetrics(meter.deviceId(), Collections.emptyList());
+ }
+ }
+
+ /**
+ * Test add meter.
+ */
@Test
- public void testAddition() {
+ public void testAdd() {
+ // Init store
+ initMeterStore();
+ // Submit meter request
manager.submit(m1Request.add());
-
+ // Verify add
assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
-
- assertThat(manager.getMeter(did("1"), MeterId.meterId(1)), is(m1));
+ assertTrue("The meter was not added", manager.getMeters(did1).size() == 1);
+ // Get Meter
+ Meter installingMeter = manager.getMeter(did1, mid1);
+ // Verify add of installingMeter and pending add state
+ assertThat(installingMeter, is(m1));
+ // Verify pending add state
+ assertThat(installingMeter.state(), is(MeterState.PENDING_ADD));
+ // Let's simulate a working data-plane
+ pushMetrics(MeterOperation.Type.ADD, installingMeter);
+ // Get meter
+ Meter installedMeter = manager.getMeter(did1, mid1);
+ // Verify installation
+ assertThat(installedMeter.state(), is(MeterState.ADDED));
+ assertTrue("The meter was not installed", manager.getAllMeters().size() == 1);
+ assertTrue("The meter was not installed", manager.getMeters(did1).size() == 1);
}
+ /**
+ * Test remove meter.
+ */
@Test
public void testRemove() {
+ // Init store
+ initMeterStore();
+ // Submit meter request
manager.submit(m1Request.add());
+ // Withdraw meter
manager.withdraw(m1Request.remove(), m1.id());
-
- assertThat(manager.getMeter(did("1"), MeterId.meterId(1)).state(),
- is(MeterState.PENDING_REMOVE));
-
- providerService.pushMeterMetrics(m1.deviceId(), Collections.emptyList());
-
+ // Get Meter
+ Meter withdrawingMeter = manager.getMeter(did1, mid1);
+ // Verify withdrawing
+ assertThat(withdrawingMeter.state(), is(MeterState.PENDING_REMOVE));
+ assertTrue("The meter was not withdrawn", manager.getAllMeters().size() == 1);
+ assertTrue("The meter was not withdrawn", manager.getMeters(did1).size() == 1);
+ // Let's simulate a working data-plane
+ pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter);
+ // Verify withdrawn
+ assertNull(manager.getMeter(did1, mid1));
assertTrue("The meter was not removed", manager.getAllMeters().size() == 0);
-
+ assertTrue("The meter was not removed", manager.getMeters(did1).size() == 0);
}
+ /**
+ * Test add multiple device.
+ */
@Test
- public void testMultipleDevice() {
+ public void testAddMultipleDevice() {
+ // Init store
+ initMeterStore();
+ // Submit meter 1
manager.submit(m1Request.add());
+ // Submit meter 2
manager.submit(m2Request.add());
-
- assertTrue("The meters were not added", manager.getAllMeters().size() == 2);
-
- assertThat(manager.getMeter(did("1"), MeterId.meterId(1)), is(m1));
- assertThat(manager.getMeter(did("2"), MeterId.meterId(1)), is(m2));
+ // Verify add
+ assertTrue("The meter was not added", manager.getAllMeters().size() == 2);
+ assertTrue("The meter was not added", manager.getMeters(did1).size() == 1);
+ assertTrue("The meter was not added", manager.getMeters(did2).size() == 1);
+ // Get Meters
+ Meter installingMeter1 = manager.getMeter(did1, mid1);
+ Meter installingMeter2 = manager.getMeter(did2, mid1);
+ // Verify add of installingMeter
+ assertThat(installingMeter1, is(m1));
+ assertThat(installingMeter2, is(m2));
+ // Verify pending add state
+ assertThat(installingMeter1.state(), is(MeterState.PENDING_ADD));
+ assertThat(installingMeter2.state(), is(MeterState.PENDING_ADD));
+ // Let's simulate a working data-plane
+ pushMetrics(MeterOperation.Type.ADD, installingMeter1);
+ pushMetrics(MeterOperation.Type.ADD, installingMeter2);
+ // Get meter
+ Meter installedMeter1 = manager.getMeter(did1, mid1);
+ Meter installedMeter2 = manager.getMeter(did2, mid1);
+ // Verify installation
+ assertThat(installedMeter1.state(), is(MeterState.ADDED));
+ assertThat(installedMeter2.state(), is(MeterState.ADDED));
+ assertTrue("The meter was not installed", manager.getAllMeters().size() == 2);
+ assertTrue("The meter was not installed", manager.getMeters(did1).size() == 1);
+ assertTrue("The meter was not installed", manager.getMeters(did2).size() == 1);
}
+ /**
+ * Test remove meter.
+ */
@Test
- public void testMeterFeatures() {
- assertEquals(meterStore.getMaxMeters(MeterFeaturesKey.key(did("1"))), 255L);
- assertEquals(meterStore.getMaxMeters(MeterFeaturesKey.key(did("2"))), 2);
- }
-
- @Test
- public void testMeterReuse() {
+ public void testRemoveMultipleDevice() {
+ // Init store
+ initMeterStore();
+ // Submit meter 1
manager.submit(m1Request.add());
- manager.submit(m3Request.add());
- Collection<Meter> meters = manager.getMeters(did("1"));
- Meter m = meters.iterator().next();
- MeterRequest mr = DefaultMeterRequest.builder()
- .forDevice(m.deviceId())
- .fromApp(m.appId())
- .withBands(m.bands())
- .withUnit(m.unit())
- .remove();
- manager.withdraw(mr, m.id());
- mr = DefaultMeterRequest.builder()
- .forDevice(m.deviceId())
- .fromApp(m.appId())
- .withBands(m.bands())
- .withUnit(m.unit())
- .add();
- Meter meter = manager.submit(mr);
- assertTrue("Meter id not reused", m.id().equals(meter.id()));
-
+ // Submit meter 2
+ manager.submit(m2Request.add());
+ // Withdraw meter
+ manager.withdraw(m1Request.remove(), m1.id());
+ // Withdraw meter
+ manager.withdraw(m2Request.remove(), m2.id());
+ // Get Meters
+ Meter withdrawingMeter1 = manager.getMeter(did1, mid1);
+ Meter withdrawingMeter2 = manager.getMeter(did2, mid1);
+ // Verify withdrawing
+ assertThat(withdrawingMeter1.state(), is(MeterState.PENDING_REMOVE));
+ assertThat(withdrawingMeter2.state(), is(MeterState.PENDING_REMOVE));
+ assertTrue("The meter was not withdrawn", manager.getAllMeters().size() == 2);
+ assertTrue("The meter was not withdrawn", manager.getMeters(did1).size() == 1);
+ assertTrue("The meter was not withdrawn", manager.getMeters(did2).size() == 1);
+ // Let's simulate a working data-plane
+ pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter1);
+ pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter2);
+ // Verify withdrawn
+ assertNull(manager.getMeter(did1, mid1));
+ assertNull(manager.getMeter(did2, mid1));
+ assertTrue("The meter was not removed", manager.getAllMeters().size() == 0);
+ assertTrue("The meter was not removed", manager.getMeters(did1).size() == 0);
+ assertTrue("The meter was not removed", manager.getMeters(did2).size() == 0);
}
+ // Test cluster service
+ private final class TestClusterService extends ClusterServiceAdapter {
+ private ControllerNode local = new DefaultControllerNode(NID_LOCAL, LOCALHOST);
- public class TestApplicationId extends DefaultApplicationId {
- public TestApplicationId(int id, String name) {
- super(id, name);
+ @Override
+ public ControllerNode getLocalNode() {
+ return local;
}
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return Sets.newHashSet();
+ }
+
}
private class TestProvider extends AbstractProvider implements MeterProvider {
@@ -272,30 +367,16 @@
@Override
public void performMeterOperation(DeviceId deviceId, MeterOperation meterOp) {
- meters.put(meterOp.meter().id(), meterOp.meter());
+ //Currently unused
}
}
- private final class TestClusterService extends ClusterServiceAdapter {
-
- ControllerNode local = new DefaultControllerNode(NID_LOCAL, LOCALHOST);
-
- @Override
- public ControllerNode getLocalNode() {
- return local;
- }
-
- @Override
- public Set<ControllerNode> getNodes() {
- return Sets.newHashSet();
- }
-
- }
-
- private class TestMastershipService extends MastershipServiceAdapter {
+ // Test mastership service
+ private final class TestMastershipService extends MastershipServiceAdapter {
@Override
public NodeId getMasterFor(DeviceId deviceId) {
return NID_LOCAL;
}
}
+
}
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
index 5c51478..2903663 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -16,7 +16,9 @@
package org.onosproject.incubator.store.meter.impl;
import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -27,6 +29,9 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.MeterQuery;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverService;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
import org.onosproject.net.meter.DefaultMeter;
@@ -44,8 +49,12 @@
import org.onosproject.net.meter.MeterStoreDelegate;
import org.onosproject.net.meter.MeterStoreResult;
import org.onosproject.store.AbstractStore;
+import org.onosproject.store.primitives.DefaultDistributedSet;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AtomicCounterMap;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
@@ -54,12 +63,13 @@
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
-import java.util.Arrays;
-import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import static org.onosproject.incubator.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
import static org.slf4j.LoggerFactory.getLogger;
@@ -77,6 +87,7 @@
private static final String METERSTORE = "onos-meter-store";
private static final String METERFEATURESSTORE = "onos-meter-features-store";
private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
+ private static final String METERIDSTORE = "onos-meters-id-store";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
@@ -87,6 +98,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DriverService driverService;
+
private ConsistentMap<MeterKey, MeterData> meters;
private NodeId local;
@@ -97,7 +111,27 @@
private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
Maps.newConcurrentMap();
- private ConsistentMap<DeviceId, BitSet> availableMeterIds;
+ // Available meter identifiers
+ private DistributedSet<MeterKey> availableMeterIds;
+
+ // Atomic counter map for generation of new identifiers;
+ private AtomicCounterMap<DeviceId> meterIdGenerators;
+
+ /**
+ * Defines possible selection strategies to reuse meter ids.
+ */
+ enum ReuseStrategy {
+ /**
+ * Select randomly an available id.
+ */
+ RANDOM,
+ /**
+ * Select the first one.
+ */
+ FIRST_FIT
+ }
+
+ private ReuseStrategy reuseStrategy = FIRST_FIT;
@Activate
public void activate() {
@@ -105,7 +139,7 @@
meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
.withName(METERSTORE)
- .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
+ .withSerializer(Serializer.using(KryoNamespaces.API,
MeterKey.class,
MeterData.class,
DefaultMeter.class,
@@ -119,16 +153,24 @@
meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
.withName(METERFEATURESSTORE)
- .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
- MeterFeaturesKey.class,
- MeterFeatures.class,
- DefaultMeterFeatures.class,
- Band.Type.class,
- Meter.Unit.class,
- MeterFailReason.class)).build();
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ MeterFeaturesKey.class,
+ MeterFeatures.class,
+ DefaultMeterFeatures.class,
+ Band.Type.class,
+ Meter.Unit.class,
+ MeterFailReason.class)).build();
- availableMeterIds = storageService.<DeviceId, BitSet>consistentMapBuilder()
+ // Init the set of the available ids
+ availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
.withName(AVAILABLEMETERIDSTORE)
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ MeterKey.class)).build(),
+ DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
+
+ // Init atomic map counters
+ meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
+ .withName(METERIDSTORE)
.withSerializer(Serializer.using(KryoNamespaces.API)).build();
log.info("Started");
@@ -140,67 +182,45 @@
log.info("Stopped");
}
- private void updateMeterIdAvailability(DeviceId deviceId, MeterId id,
- boolean available) {
- availableMeterIds.compute(deviceId, (k, v) -> {
- v = v == null ? new BitSet() : v;
- v.set(id.id().intValue(), available);
- return v;
- });
- }
-
- @Override
- public MeterId firstReusableMeterId(DeviceId deviceId) {
- Versioned<BitSet> bitSetVersioned = availableMeterIds.get(deviceId);
- if (bitSetVersioned == null) {
- return null;
- }
- BitSet value = bitSetVersioned.value();
- int nextSetBit = value.nextSetBit(1);
- if (nextSetBit < 0) {
- return null;
- }
- return MeterId.meterId(nextSetBit);
- }
-
@Override
public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
+ // Init steps
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
- updateMeterIdAvailability(meter.deviceId(), meter.id(), false);
+ // Store the future related to the operation
futures.put(key, future);
+ // Store the meter data
MeterData data = new MeterData(meter, null, local);
-
try {
meters.put(key, data);
} catch (StorageException e) {
future.completeExceptionally(e);
}
-
+ // Done, return the future
return future;
-
}
@Override
public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
+ // Init steps
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ // Store the future related to the operation
futures.put(key, future);
-
+ // Create the meter data
MeterData data = new MeterData(meter, null, local);
-
- // update the state of the meter. It will be pruned by observing
+ // Update the state of the meter. It will be pruned by observing
// that it has been removed from the dataplane.
try {
+ // If it does not exist in the system
if (meters.computeIfPresent(key, (k, v) -> data) == null) {
+ // Complete immediately
future.complete(MeterStoreResult.success());
}
- updateMeterIdAvailability(meter.deviceId(), meter.id(), true);
} catch (StorageException e) {
future.completeExceptionally(e);
}
-
-
+ // Done, return the future
return future;
}
@@ -289,9 +309,15 @@
@Override
public void deleteMeterNow(Meter m) {
+ // Create the key
MeterKey key = MeterKey.key(m.deviceId(), m.id());
+ // Remove the future
futures.remove(key);
+ // Remove the meter
meters.remove(key);
+ // Free the id
+ freeMeterId(m.deviceId(), m.id());
+ // Finally notify the delegate
notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
}
@@ -301,6 +327,108 @@
return features == null ? 0L : features.maxMeter();
}
+ // queryMaxMeters is implemented in FullMetersAvailable behaviour.
+ private long queryMaxMeters(DeviceId device) {
+ // Get driver handler for this device
+ DriverHandler handler = driverService.createHandler(device);
+ // If creation failed or the device does not have this behavior
+ if (handler == null || !handler.hasBehaviour(MeterQuery.class)) {
+ // We cannot know max meter
+ return 0L;
+ }
+ // Get the behavior
+ MeterQuery query = handler.behaviour(MeterQuery.class);
+ // Return as max meter the result of the query
+ return query.getMaxMeters();
+ }
+
+ private boolean updateMeterIdAvailability(DeviceId deviceId, MeterId id,
+ boolean available) {
+ // According to available, make available or unavailable a meter key
+ return available ? availableMeterIds.add(MeterKey.key(deviceId, id)) :
+ availableMeterIds.remove(MeterKey.key(deviceId, id));
+ }
+
+ private MeterId getNextAvailableId(Set<MeterId> availableIds) {
+ // If there are no available ids
+ if (availableIds.isEmpty()) {
+ // Just end the cycle
+ return null;
+ }
+ // If it is the first fit
+ if (reuseStrategy == FIRST_FIT || availableIds.size() == 1) {
+ return availableIds.iterator().next();
+ }
+ // If it is random, get the size
+ int size = availableIds.size();
+ // Return a random element
+ return Iterables.get(availableIds, RandomUtils.nextInt(size));
+ }
+
+ // Implements reuse strategy
+ private MeterId firstReusableMeterId(DeviceId deviceId) {
+ // Filter key related to device id, and reduce to meter ids
+ Set<MeterId> localAvailableMeterIds = availableMeterIds.stream()
+ .filter(meterKey -> meterKey.deviceId().equals(deviceId))
+ .map(MeterKey::meterId)
+ .collect(Collectors.toSet());
+ // Get next available id
+ MeterId meterId = getNextAvailableId(localAvailableMeterIds);
+ // Iterate until there are items
+ while (meterId != null) {
+ // If we are able to reserve the id
+ if (updateMeterIdAvailability(deviceId, meterId, false)) {
+ // Just end
+ return meterId;
+ }
+ // Update the set
+ localAvailableMeterIds.remove(meterId);
+ // Try another time
+ meterId = getNextAvailableId(localAvailableMeterIds);
+ }
+ // No reusable ids
+ return null;
+ }
+
+ @Override
+ public MeterId allocateMeterId(DeviceId deviceId) {
+ // Init steps
+ MeterId meterId;
+ long id;
+ // Try to reuse meter id
+ meterId = firstReusableMeterId(deviceId);
+ // We found a reusable id, return
+ if (meterId != null) {
+ return meterId;
+ }
+ // If there was no reusable MeterId we have to generate a new value
+ // using maxMeters as upper limit.
+ long maxMeters = getMaxMeters(MeterFeaturesKey.key(deviceId));
+ // If the device does not give us MeterFeatures
+ if (maxMeters == 0L) {
+ // MeterFeatures couldn't be retrieved, fallback to queryMeters.
+ maxMeters = queryMaxMeters(deviceId);
+ }
+ // If we don't know the max, cannot proceed
+ if (maxMeters == 0L) {
+ return null;
+ }
+ // Get a new value
+ id = meterIdGenerators.incrementAndGet(deviceId);
+ // Check with the max, and if the value is bigger, cannot proceed
+ if (id >= maxMeters) {
+ return null;
+ }
+ // Done, return the value
+ return MeterId.meterId(id);
+ }
+
+ @Override
+ public void freeMeterId(DeviceId deviceId, MeterId meterId) {
+ // Update the availability
+ updateMeterIdAvailability(deviceId, meterId, true);
+ }
+
private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
@Override
public void event(MapEvent<MeterKey, MeterData> event) {
diff --git a/incubator/store/src/test/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStoreTest.java b/incubator/store/src/test/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStoreTest.java
new file mode 100644
index 0000000..94940c4
--- /dev/null
+++ b/incubator/store/src/test/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStoreTest.java
@@ -0,0 +1,486 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.meter.impl;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.MeterQuery;
+import org.onosproject.net.driver.Behaviour;
+import org.onosproject.net.driver.Driver;
+import org.onosproject.net.driver.DriverData;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.driver.DriverServiceAdapter;
+import org.onosproject.net.meter.Band;
+import org.onosproject.net.meter.DefaultBand;
+import org.onosproject.net.meter.DefaultMeter;
+import org.onosproject.net.meter.DefaultMeterFeatures;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterFeatures;
+import org.onosproject.net.meter.MeterFeaturesKey;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
+import org.onosproject.net.meter.MeterState;
+import org.onosproject.store.service.TestStorageService;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.*;
+import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.did;
+
+/**
+ * Meter store tests.
+ */
+public class DistributedMeterStoreTest {
+
+ // Test node id
+ private static final NodeId NID_LOCAL = new NodeId("local");
+
+ // Test ip address
+ private static final IpAddress LOCALHOST = IpAddress.valueOf("127.0.0.1");
+
+ // Store under testing
+ private DistributedMeterStore meterStore;
+
+ // Device ids used during the tests
+ private DeviceId did1 = did("1");
+ private DeviceId did2 = did("2");
+ private DeviceId did3 = did("3");
+ private DeviceId did4 = did("4");
+
+ // Meter ids used during the tests
+ private MeterId mid1 = MeterId.meterId(1);
+ private MeterId mid2 = MeterId.meterId(2);
+
+ // Bands used during the tests
+ private Band b1 = DefaultBand.builder()
+ .ofType(Band.Type.DROP)
+ .withRate(500)
+ .build();
+
+ // Meters used during the tests
+ private Meter m1 = DefaultMeter.builder()
+ .forDevice(did1)
+ .fromApp(APP_ID)
+ .withId(mid1)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+
+ // Meter features used during the tests
+ private MeterFeatures mef1 = DefaultMeterFeatures.builder().forDevice(did1)
+ .withMaxMeters(3L)
+ .withBandTypes(new HashSet<>())
+ .withUnits(new HashSet<>())
+ .hasStats(false)
+ .hasBurst(false)
+ .withMaxBands((byte) 0)
+ .withMaxColors((byte) 0)
+ .build();
+ private MeterFeatures mef2 = DefaultMeterFeatures.builder().forDevice(did2)
+ .withMaxMeters(10L)
+ .withBandTypes(new HashSet<>())
+ .withUnits(new HashSet<>())
+ .hasStats(false)
+ .hasBurst(false)
+ .withMaxBands((byte) 0)
+ .withMaxColors((byte) 0)
+ .build();
+
+ @Before
+ public void setup() {
+ // Init step
+ meterStore = new DistributedMeterStore();
+ // Let's initialize some internal services
+ TestUtils.setField(meterStore, "storageService", new TestStorageService());
+ TestUtils.setField(meterStore, "clusterService", new TestClusterService());
+ TestUtils.setField(meterStore, "mastershipService", new TestMastershipService());
+ TestUtils.setField(meterStore, "driverService", new TestDriverService());
+ // Activate the store
+ meterStore.activate();
+ }
+
+ @After
+ public void tearDown() {
+ // Deactivate the store
+ meterStore.deactivate();
+ }
+
+ private void initMeterStore() {
+ // Let's store feature for device 1
+ meterStore.storeMeterFeatures(mef1);
+ // Let's store feature for device 2
+ meterStore.storeMeterFeatures(mef2);
+ }
+
+ /**
+ * Test proper store of meter features.
+ */
+ @Test
+ public void testStoreMeterFeatures() {
+ // Let's store feature for device 1
+ meterStore.storeMeterFeatures(mef1);
+ // Verify store meter features
+ assertThat(meterStore.getMaxMeters(MeterFeaturesKey.key(did1)), is(3L));
+ // Let's store feature for device 1
+ meterStore.storeMeterFeatures(mef2);
+ // Verify store meter features
+ assertThat(meterStore.getMaxMeters(MeterFeaturesKey.key(did2)), is(10L));
+ }
+
+ /**
+ * Test proper delete of meter features.
+ */
+ @Test
+ public void testDeleteMeterFeatures() {
+ // Let's store feature for device 1
+ meterStore.storeMeterFeatures(mef1);
+ // Verify store meter features
+ assertThat(meterStore.getMaxMeters(MeterFeaturesKey.key(did1)), is(3L));
+ // Let's delete the features
+ meterStore.deleteMeterFeatures(did1);
+ // Verify delete meter features
+ assertThat(meterStore.getMaxMeters(MeterFeaturesKey.key(did1)), is(0L));
+ }
+
+ /**
+ * Test proper allocation of meter ids.
+ */
+ @Test
+ public void testAllocateId() {
+ // Init the store
+ initMeterStore();
+ // Allocate a meter id and verify is equal to mid1
+ assertThat(mid1, is(meterStore.allocateMeterId(did1)));
+ // Allocate a meter id and verify is equal to mid2
+ assertThat(mid2, is(meterStore.allocateMeterId(did1)));
+ }
+
+ /**
+ * Test proper free of meter ids.
+ */
+ @Test
+ public void testFreeId() {
+ // Init the store
+ initMeterStore();
+ // Allocate a meter id and verify is equal to mid1
+ assertThat(mid1, is(meterStore.allocateMeterId(did1)));
+ // Free the above id
+ meterStore.freeMeterId(did1, mid1);
+ // Allocate a meter id and verify is equal to mid1
+ assertThat(mid1, is(meterStore.allocateMeterId(did1)));
+ }
+
+ /**
+ * Test proper reuse of meter ids.
+ */
+ @Test
+ public void testReuseId() {
+ // Init the store
+ initMeterStore();
+ // Reserve id 1
+ MeterId meterIdOne = meterStore.allocateMeterId(did2);
+ // Free the above id
+ meterStore.freeMeterId(did2, meterIdOne);
+ // Start an async reservation
+ CompletableFuture<MeterId> future = CompletableFuture.supplyAsync(
+ () -> meterStore.allocateMeterId(did2)
+ );
+ // Start another reservation
+ MeterId meterIdTwo = meterStore.allocateMeterId(did2);
+ try {
+ meterIdOne = future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ // Ids should be different, otherwise we had clash in the store
+ assertNotEquals("Ids should be different", meterIdOne, meterIdTwo);
+
+ // Free the above id
+ meterStore.freeMeterId(did1, meterIdOne);
+ // Free the above id
+ meterStore.freeMeterId(did1, meterIdTwo);
+ // Reserve id 1
+ meterIdOne = meterStore.allocateMeterId(did2);
+ // Reserve id 2
+ meterStore.allocateMeterId(did2);
+ // Reserve id 3
+ MeterId meterIdThree = meterStore.allocateMeterId(did2);
+ // Reserve id 4
+ MeterId meterIdFour = meterStore.allocateMeterId(did2);
+ // Free the above id
+ meterStore.freeMeterId(did1, meterIdOne);
+ // Free the above id
+ meterStore.freeMeterId(did1, meterIdThree);
+ // Free the above id
+ meterStore.freeMeterId(did1, meterIdFour);
+ // Start an async reservation
+ future = CompletableFuture.supplyAsync(
+ () -> meterStore.allocateMeterId(did2)
+ );
+ // Start another reservation
+ MeterId meterAnotherId = meterStore.allocateMeterId(did2);
+ try {
+ meterAnotherId = future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ // Ids should be different, otherwise we had clash in the store
+ assertNotEquals("Ids should be different", meterAnotherId, meterIdOne);
+ }
+
+ /**
+ * Test query meters mechanism.
+ */
+ @Test
+ public void testQueryMeters() {
+ // Init the store
+ initMeterStore();
+ // Let's test queryMeters
+ assertThat(mid1, is(meterStore.allocateMeterId(did3)));
+ // Let's test queryMeters error
+ assertNull(meterStore.allocateMeterId(did4));
+ }
+
+ /**
+ * Test max meter error.
+ */
+ @Test
+ public void testMaxMeterError() {
+ // Init the store
+ initMeterStore();
+ // Reserve id 1
+ assertThat(mid1, is(meterStore.allocateMeterId(did1)));
+ // Reserve id 2
+ assertThat(mid2, is(meterStore.allocateMeterId(did1)));
+ // Max meter error
+ assertNull(meterStore.allocateMeterId(did1));
+ }
+
+ /**
+ * Test store meter.
+ */
+ @Test
+ public void testStoreMeter() {
+ // Init the store
+ initMeterStore();
+ // Simulate the allocation of an id
+ MeterId idOne = meterStore.allocateMeterId(did1);
+ // Verify the allocation
+ assertThat(mid1, is(idOne));
+ // Let's create a meter
+ Meter meterOne = DefaultMeter.builder()
+ .forDevice(did1)
+ .fromApp(APP_ID)
+ .withId(mid1)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ // Set the state
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_ADD);
+ // Store the meter
+ meterStore.storeMeter(meterOne);
+ // Let's create meter key
+ MeterKey meterKey = MeterKey.key(did1, mid1);
+ // Verify the store
+ assertThat(1, is(meterStore.getAllMeters().size()));
+ assertThat(1, is(meterStore.getAllMeters(did1).size()));
+ assertThat(m1, is(meterStore.getMeter(meterKey)));
+ }
+
+ /**
+ * Test delete meter.
+ */
+ @Test
+ public void testDeleteMeter() {
+ // Init the store
+ initMeterStore();
+ // Simulate the allocation of an id
+ MeterId idOne = meterStore.allocateMeterId(did1);
+ // Verify the allocation
+ assertThat(mid1, is(idOne));
+ // Let's create a meter
+ Meter meterOne = DefaultMeter.builder()
+ .forDevice(did1)
+ .fromApp(APP_ID)
+ .withId(mid1)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ // Set the state
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_ADD);
+ // Store the meter
+ meterStore.storeMeter(meterOne);
+ // Set the state
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_REMOVE);
+ // Let's create meter key
+ MeterKey meterKey = MeterKey.key(did1, mid1);
+ // Delete meter
+ meterStore.deleteMeter(meterOne);
+ // Start an async delete, simulating the operation of the provider
+ CompletableFuture<Void> future = CompletableFuture.runAsync(
+ () -> meterStore.deleteMeterNow(meterOne)
+ );
+ // Let's wait
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ // Verify delete
+ assertThat(0, is(meterStore.getAllMeters().size()));
+ assertThat(0, is(meterStore.getAllMeters(did1).size()));
+ assertNull(meterStore.getMeter(meterKey));
+ assertThat(mid1, is(meterStore.allocateMeterId(did1)));
+ }
+
+ /**
+ * Test no delete meter.
+ */
+ @Test
+ public void testNoDeleteMeter() {
+ // Init the store
+ initMeterStore();
+ // Simulate the allocation of an id
+ MeterId idOne = meterStore.allocateMeterId(did1);
+ // Create the key
+ MeterKey keyOne = MeterKey.key(did1, idOne);
+ // Let's create a meter
+ Meter meterOne = DefaultMeter.builder()
+ .forDevice(did1)
+ .fromApp(APP_ID)
+ .withId(mid1)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ // Set the state
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_REMOVE);
+ // Delete meter
+ meterStore.deleteMeter(meterOne);
+ // Verify No delete
+ assertThat(0, is(meterStore.getAllMeters().size()));
+ assertThat(0, is(meterStore.getAllMeters(did1).size()));
+ assertNull(meterStore.getMeter(keyOne));
+ }
+
+ // Test cluster service
+ private final class TestClusterService extends ClusterServiceAdapter {
+
+ private ControllerNode local = new DefaultControllerNode(NID_LOCAL, LOCALHOST);
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return local;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return Sets.newHashSet();
+ }
+
+ }
+
+ // Test mastership service
+ private final class TestMastershipService extends MastershipServiceAdapter {
+ @Override
+ public NodeId getMasterFor(DeviceId deviceId) {
+ return NID_LOCAL;
+ }
+ }
+
+ // Test class for driver service.
+ private class TestDriverService extends DriverServiceAdapter {
+ @Override
+ public DriverHandler createHandler(DeviceId deviceId, String... credentials) {
+ return deviceId.equals(did3) ? new TestDriverHandler() : null;
+ }
+ }
+
+ // Test class for driver handler.
+ private class TestDriverHandler implements DriverHandler {
+
+ @Override
+ public Driver driver() {
+ return null;
+ }
+
+ @Override
+ public DriverData data() {
+ return null;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends Behaviour> T behaviour(Class<T> behaviourClass) {
+ return (T) new TestMeterQuery();
+ }
+
+ @Override
+ public <T> T get(Class<T> serviceClass) {
+ return null;
+ }
+
+ @Override
+ public boolean hasBehaviour(Class<? extends Behaviour> behaviourClass) {
+ return true;
+ }
+ }
+
+ // Test meter query
+ private class TestMeterQuery implements MeterQuery {
+
+ @Override
+ public DriverData data() {
+ return null;
+ }
+
+ @Override
+ public void setData(DriverData data) {
+
+ }
+ @Override
+ public DriverHandler handler() {
+ return null;
+ }
+
+ @Override
+ public void setHandler(DriverHandler handler) {
+
+ }
+
+ @Override
+ public long getMaxMeters() {
+ return 100;
+ }
+ }
+
+}