[SDFAB-500][SDFAB-499] Implement user defined index mode for the meter service
- Introduce a boolean to control the meter service modes
- User defined mode does not provide any coordination to the apps
- Only one mode can be active at time
- In addition some sanity checks are peformed by the meter service
- Update existing unit tests and add new ones to test the new behaviors
- Initial clean up of the meters subsystems
Change-Id: I61500b794f27e94abd11637c84bce0dbb2e073f3
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
index 811b418..7f4de4b 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
@@ -86,16 +86,6 @@
MeterStoreResult deleteMeterFeatures(Collection<MeterFeatures> meterfeatures);
/**
- * Updates a meter whose meter id is the same as the passed meter.
- *
- * @param meter a new meter
- * @return a future indicating the result of the store operation
- * @deprecated in onos-2.5 replaced by {@link #addOrUpdateMeter(Meter)}
- */
- @Deprecated
- CompletableFuture<MeterStoreResult> updateMeter(Meter meter);
-
- /**
* Updates a given meter's state with the provided state.
*
* @param meter a meter
@@ -200,10 +190,20 @@
* This API is typically used when the device is offline.
*
* @param deviceId the device id
+ * @deprecated in onos-2.5, replaced by {@link #purgeMeters(DeviceId)}
*/
+ @Deprecated
void purgeMeter(DeviceId deviceId);
/**
+ * Removes all meters of given device from store.
+ * This API is typically used when the device is offline.
+ *
+ * @param deviceId the device id
+ */
+ void purgeMeters(DeviceId deviceId);
+
+ /**
* Removes all meters of given device and for the given application from store.
* This API is typically used when the device is offline.
*
@@ -212,4 +212,14 @@
*/
void purgeMeters(DeviceId deviceId, ApplicationId appId);
+ /**
+ * Enables/disables user defined index mode for the store. In this mode users
+ * can provide an index for the meter. Store may reject switching mode requests
+ * at run time if meters were already allocated.
+ *
+ * @param enable to enable/disable the user defined index mode.
+ * @return true if user defined index mode is enabled. False otherwise.
+ */
+ boolean userDefinedIndexMode(boolean enable);
+
}
diff --git a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
index 64b3cc2..ab3096f 100644
--- a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
+++ b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
@@ -125,6 +125,9 @@
public static final String MM_PURGE_ON_DISCONNECTION = "purgeOnDisconnection";
public static final boolean MM_PURGE_ON_DISCONNECTION_DEFAULT = false;
+ public static final String MM_USER_DEFINED_INDEX = "userDefinedIndex";
+ public static final boolean MM_USER_DEFINED_INDEX_DEFAULT = false;
+
public static final String NRM_ARP_ENABLED = "arpEnabled";
public static final boolean NRM_ARP_ENABLED_DEFAULT = true;
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 9b786e5..b2e6730 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -71,6 +71,7 @@
import java.util.Objects;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.PredictableExecutor.newPredictableExecutor;
@@ -82,6 +83,8 @@
import static org.onosproject.net.OsgiPropertyConstants.MM_NUM_THREADS_DEFAULT;
import static org.onosproject.net.OsgiPropertyConstants.MM_PURGE_ON_DISCONNECTION;
import static org.onosproject.net.OsgiPropertyConstants.MM_PURGE_ON_DISCONNECTION_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.MM_USER_DEFINED_INDEX;
+import static org.onosproject.net.OsgiPropertyConstants.MM_USER_DEFINED_INDEX_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -97,6 +100,7 @@
MM_NUM_THREADS + ":Integer=" + MM_NUM_THREADS_DEFAULT,
MM_FALLBACK_METER_POLL_FREQUENCY + ":Integer=" + MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT,
MM_PURGE_ON_DISCONNECTION + ":Boolean=" + MM_PURGE_ON_DISCONNECTION_DEFAULT,
+ MM_USER_DEFINED_INDEX + ":Boolean=" + MM_USER_DEFINED_INDEX_DEFAULT,
}
)
public class MeterManager
@@ -142,6 +146,9 @@
/** Purge entries associated with a device when the device goes offline. */
private boolean purgeOnDisconnection = MM_PURGE_ON_DISCONNECTION_DEFAULT;
+ /** Enable user defined index mode. Users can provide their own meter index. */
+ protected boolean userDefinedIndex = MM_USER_DEFINED_INDEX_DEFAULT;
+
// Action triggered when the futures related to submit and withdrawal complete
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
@@ -221,6 +228,11 @@
purgeOnDisconnection ? "enabled" : "disabled");
}
+ flag = Tools.isPropertyEnabled(properties, MM_USER_DEFINED_INDEX);
+ boolean enable = flag == null ? userDefinedIndex : flag;
+ userDefinedIndex = store.userDefinedIndexMode(enable);
+ log.info("UserDefinedIndex is {}", userDefinedIndex ? "enabled" : "disabled");
+
String s = get(properties, MM_FALLBACK_METER_POLL_FREQUENCY);
try {
fallbackMeterPollFrequency = isNullOrEmpty(s) ?
@@ -257,6 +269,7 @@
checkNotNull(request, "request cannot be null.");
MeterCellId cellId;
if (request.index().isPresent()) {
+ checkArgument(userDefinedIndex, "Index cannot be provided when userDefinedIndex mode is disabled");
// User provides index
if (request.scope().isGlobal()) {
cellId = MeterId.meterId(request.index().get());
@@ -265,6 +278,7 @@
PiMeterId.of(request.scope().id()), request.index().get());
}
} else {
+ checkArgument(!userDefinedIndex, "Index cannot be allocated when userDefinedIndex mode is enabled");
// Allocate an id
cellId = allocateMeterId(request.deviceId(), request.scope());
}
diff --git a/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java b/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
index 09e9774..1510e35 100644
--- a/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
@@ -57,6 +57,7 @@
import org.onosproject.net.meter.DefaultMeterFeatures;
import org.onosproject.net.meter.DefaultMeterRequest;
import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterCellId;
import org.onosproject.net.meter.MeterFeatures;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterOperation;
@@ -66,9 +67,12 @@
import org.onosproject.net.meter.MeterProviderRegistry;
import org.onosproject.net.meter.MeterProviderService;
import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.net.meter.MeterScope;
import org.onosproject.net.meter.MeterService;
import org.onosproject.net.meter.MeterState;
import org.onosproject.net.pi.PiPipeconfServiceAdapter;
+import org.onosproject.net.pi.model.PiMeterId;
+import org.onosproject.net.pi.runtime.PiMeterCellId;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.meter.impl.DistributedMeterStore;
@@ -88,6 +92,7 @@
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -97,6 +102,7 @@
import static org.onosproject.net.NetTestTools.APP_ID;
import static org.onosproject.net.NetTestTools.did;
import static org.onosproject.net.NetTestTools.injectEventDispatcher;
+import static org.onosproject.net.OsgiPropertyConstants.MM_USER_DEFINED_INDEX;
/**
* Meter manager tests.
@@ -150,6 +156,7 @@
// Meter ids used during the tests
private MeterId mid1 = MeterId.meterId(1);
+ private MeterCellId cid0 = PiMeterCellId.ofIndirect(PiMeterId.of("foo"), 0L);
// Bands used during the tests
private static Band b1 = DefaultBand.builder()
@@ -180,6 +187,21 @@
.withUnit(Meter.Unit.KB_PER_SEC)
.withBands(Collections.singletonList(b1))
.build();
+ private static Meter mProgrammable2 = DefaultMeter.builder()
+ .forDevice(PROGRAMMABLE_DID)
+ .fromApp(APP_ID)
+ .withId(MeterId.meterId(2))
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+
+ private static Meter mUserDefined = DefaultMeter.builder()
+ .forDevice(PROGRAMMABLE_DID)
+ .fromApp(APP_ID)
+ .withCellId(PiMeterCellId.ofIndirect(PiMeterId.of("foo"), 0L))
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
// Meter requests used during the tests
private MeterRequest.Builder m1Request = DefaultMeterRequest.builder()
@@ -198,6 +220,19 @@
.fromApp(APP_ID)
.withUnit(Meter.Unit.KB_PER_SEC)
.withBands(Collections.singletonList(b1));
+ private MeterRequest.Builder mProgrammableRequest2 = DefaultMeterRequest.builder()
+ .forDevice(PROGRAMMABLE_DID)
+ .fromApp(APP_ID)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1));
+
+ private MeterRequest.Builder userDefinedRequest = DefaultMeterRequest.builder()
+ .forDevice(PROGRAMMABLE_DID)
+ .fromApp(APP_ID)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .withScope(MeterScope.of("foo"))
+ .withIndex(0L);
// Meter features used during the tests
private MeterFeatures mef1 = DefaultMeterFeatures.builder().forDevice(did1)
@@ -218,7 +253,29 @@
.withMaxBands((byte) 0)
.withMaxColors((byte) 0)
.build();
+ private MeterFeatures programmableMef1 = DefaultMeterFeatures.builder().forDevice(PROGRAMMABLE_DID)
+ .withStartIndex(1)
+ .withEndIndex(10L)
+ .withBandTypes(new HashSet<>())
+ .withUnits(new HashSet<>())
+ .hasStats(false)
+ .hasBurst(false)
+ .withMaxBands((byte) 0)
+ .withMaxColors((byte) 0)
+ .build();
+ private MeterFeatures programmableMef2 = DefaultMeterFeatures.builder().forDevice(PROGRAMMABLE_DID)
+ .withStartIndex(0)
+ .withEndIndex(10L)
+ .withScope(MeterScope.of("foo"))
+ .withBandTypes(new HashSet<>())
+ .withUnits(new HashSet<>())
+ .hasStats(false)
+ .hasBurst(false)
+ .withMaxBands((byte) 0)
+ .withMaxColors((byte) 0)
+ .build();
+ private ComponentContext componentContext = EasyMock.createMock(ComponentContext.class);
@Before
public void setup() {
@@ -263,7 +320,6 @@
// Activate the manager
Dictionary<String, Object> cfgDict = new Hashtable<>();
- ComponentContext componentContext = EasyMock.createMock(ComponentContext.class);
expect(componentContext.getProperties()).andReturn(cfgDict);
replay(componentContext);
manager.activate(componentContext);
@@ -292,20 +348,18 @@
meterStore.deactivate();
}
+ // Store meter features for all the devices
private void initMeterStore() {
- // Let's store feature for device 1
meterStore.storeMeterFeatures(mef1);
- // Let's store feature for device 2
meterStore.storeMeterFeatures(mef2);
+ meterStore.storeMeterFeatures(programmableMef1);
+ meterStore.storeMeterFeatures(programmableMef2);
}
// Emulate metrics coming from the dataplane
private void pushMetrics(MeterOperation.Type type, Meter meter) {
- // If it is an add operation
if (type == MeterOperation.Type.ADD) {
- // Update state to added
((DefaultMeter) meter).setState(MeterState.ADDED);
- // Push the update in the store
providerService.pushMeterMetrics(meter.deviceId(), Collections.singletonList(meter));
} else {
providerService.pushMeterMetrics(meter.deviceId(), Collections.emptyList());
@@ -313,6 +367,42 @@
}
/**
+ * Verify enabling user defined index mode in meter service.
+ */
+ @Test
+ public void testEnableUserDefinedIndex() {
+ reset(componentContext);
+ Dictionary<String, Object> cfgDict = new Hashtable<>();
+ cfgDict.put(MM_USER_DEFINED_INDEX, true);
+ expect(componentContext.getProperties()).andReturn(cfgDict);
+ replay(componentContext);
+
+ Object returnValue = TestUtils.callMethod(manager, "readComponentConfiguration",
+ ComponentContext.class, componentContext);
+ assertNull(returnValue);
+ assertTrue(manager.userDefinedIndex);
+ }
+
+ /**
+ * Verify disabling user defined index mode in meter service.
+ */
+ @Test
+ public void testDisableUserDefinedIndex() {
+ testEnableUserDefinedIndex();
+
+ reset(componentContext);
+ Dictionary<String, Object> cfgDict = new Hashtable<>();
+ cfgDict.put(MM_USER_DEFINED_INDEX, false);
+ expect(componentContext.getProperties()).andReturn(cfgDict);
+ replay(componentContext);
+
+ Object returnValue = TestUtils.callMethod(manager, "readComponentConfiguration",
+ ComponentContext.class, componentContext);
+ assertNull(returnValue);
+ assertFalse(manager.userDefinedIndex);
+ }
+
+ /**
* Test add meter.
*/
@Test
@@ -341,6 +431,49 @@
}
/**
+ * Test add meter with user defined index.
+ */
+ @Test
+ public void testAddWithUserDefinedIndex() {
+ initMeterStore();
+ testEnableUserDefinedIndex();
+
+ manager.submit(userDefinedRequest.add());
+ assertEquals("The meter was not added", 1, manager.getAllMeters().size());
+ assertEquals("The meter was not added", 1, manager.getMeters(PROGRAMMABLE_DID).size());
+ Meter installingMeter = manager.getMeter(PROGRAMMABLE_DID, cid0);
+ assertThat(installingMeter, is(mUserDefined));
+ assertThat(installingMeter.state(), is(MeterState.PENDING_ADD));
+
+ pushMetrics(MeterOperation.Type.ADD, installingMeter);
+ Meter installedMeter = manager.getMeter(PROGRAMMABLE_DID, cid0);
+ assertThat(installedMeter.state(), is(MeterState.ADDED));
+ assertEquals("The meter was not installed", 1, manager.getAllMeters().size());
+ assertEquals("The meter was not installed", 1, manager.getMeters(PROGRAMMABLE_DID).size());
+ }
+
+ /**
+ * Test wrong add meter.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongAdd() {
+ initMeterStore();
+
+ manager.submit(userDefinedRequest.add());
+ }
+
+ /**
+ * Test wrong add meter in user defined index mode.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testWrongAddInUserDefinedIndexMode() {
+ initMeterStore();
+ testEnableUserDefinedIndex();
+
+ manager.submit(m1Request.add());
+ }
+
+ /**
* Test remove meter.
*/
@Test
@@ -366,7 +499,29 @@
}
/**
- * Test add multiple device.
+ * Test remove meter in user defined index mode.
+ */
+ @Test
+ public void testRemoveInUserDefinedIndexMode() {
+ initMeterStore();
+ testEnableUserDefinedIndex();
+
+ manager.submit(userDefinedRequest.add());
+
+ manager.withdraw(userDefinedRequest.remove(), cid0);
+ Meter withdrawingMeter = manager.getMeter(PROGRAMMABLE_DID, cid0);
+ assertThat(withdrawingMeter.state(), is(MeterState.PENDING_REMOVE));
+ assertEquals("The meter was not withdrawn", 1, manager.getAllMeters().size());
+ assertEquals("The meter was not withdrawn", 1, manager.getMeters(PROGRAMMABLE_DID).size());
+
+ pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter);
+ assertNull(manager.getMeter(PROGRAMMABLE_DID, cid0));
+ assertEquals("The meter was not removed", 0, manager.getAllMeters().size());
+ assertEquals("The meter was not removed", 0, manager.getMeters(PROGRAMMABLE_DID).size());
+ }
+
+ /**
+ * Test add multiple devices.
*/
@Test
public void testAddMultipleDevice() {
@@ -404,7 +559,7 @@
}
/**
- * Test remove meter.
+ * Test remove multiple devices.
*/
@Test
public void testRemoveMultipleDevice() {
@@ -460,6 +615,9 @@
assertEquals("The meter was not purged", 0, manager.getMeters(did1).size());
}
+ /**
+ * Test submit for programmable devices.
+ */
@Test
public void testAddFromMeterProgrammable() {
// Init store
@@ -471,6 +629,9 @@
});
}
+ /**
+ * Test batch submission for meter programmable.
+ */
@Test
public void testAddBatchFromMeterProgrammable() {
// Init store
@@ -484,6 +645,9 @@
}
+ /**
+ * Verify get from meter programmable.
+ */
@Test
public void testGetFromMeterProgrammable() {
// Init store
@@ -498,6 +662,89 @@
});
}
+ /**
+ * Verify installation of missing meters when using meter programmable devices.
+ */
+ @Test
+ public void testMissingFromMeterProgrammable() {
+ // Workaround when running the tests all together
+ meterOperations.clear();
+ testGetFromMeterProgrammable();
+
+ assertThat(meterOperations.size(), is(1));
+ manager.submit(mProgrammableRequest2.add());
+ TestTools.assertAfter(500, () -> {
+ assertEquals("The meter was not added", 2, manager.getAllMeters().size());
+ assertThat(manager.getMeter(PROGRAMMABLE_DID, MeterId.meterId(2)), is(mProgrammable2));
+ assertThat(meterOperations.size(), is(2));
+ assertThat(meterOperations.get(meterOperations.size() - 1), is(new MeterOperation(mProgrammable2,
+ MeterOperation.Type.ADD)));
+ });
+
+ TestTools.assertAfter(2000, () -> {
+ assertEquals("The meter was not added", 2, manager.getAllMeters().size());
+ Meter pendingMeter = manager.getMeter(PROGRAMMABLE_DID, MeterId.meterId(2));
+ assertThat(pendingMeter, is(mProgrammable2));
+ assertEquals("incorrect state", MeterState.PENDING_ADD, pendingMeter.state());
+ assertThat(meterOperations.size(), is(3));
+ assertThat(meterOperations.get(meterOperations.size() - 1), is(new MeterOperation(mProgrammable2,
+ MeterOperation.Type.ADD)));
+ });
+ }
+
+ /**
+ * Verify removal of unknown meters when using meter programmable devices.
+ */
+ @Test
+ public void testUnknownFromMeterProgrammable() {
+ meterOperations.clear();
+ testGetFromMeterProgrammable();
+ TestMeterProgrammable.unknownMeter = true;
+
+ assertThat(meterOperations.size(), is(1));
+ TestTools.assertAfter(2000, () -> {
+ assertEquals("The meter was not added", 1, manager.getAllMeters().size());
+ Meter pendingMeter = manager.getMeter(PROGRAMMABLE_DID, MeterId.meterId(2));
+ assertNull(pendingMeter);
+ assertThat(meterOperations.size(), is(2));
+ assertThat(meterOperations.get(meterOperations.size() - 1), is(new MeterOperation(mProgrammable2,
+ MeterOperation.Type.REMOVE)));
+ });
+ }
+
+ /**
+ * Verify removal of meters when using meter programmable devices.
+ */
+ @Test
+ public void testRemoveFromMeterProgrammable() {
+ testEnableUserDefinedIndex();
+ initMeterStore();
+ MeterDriverProvider fallback = (MeterDriverProvider) manager.defaultProvider();
+ fallback.init(manager.deviceService, fallback.meterProviderService, manager.mastershipService, 1);
+
+ manager.submit(mProgrammableRequest2.withIndex(2L).add());
+ TestTools.assertAfter(500, () -> {
+ assertEquals("The meter was not added", 1, manager.getAllMeters().size());
+ Meter pendingMeter = manager.getMeter(PROGRAMMABLE_DID, MeterId.meterId(2));
+ assertThat(pendingMeter, is(mProgrammable2));
+ assertEquals("incorrect state", MeterState.PENDING_ADD, pendingMeter.state());
+ });
+
+ manager.withdraw(mProgrammableRequest2.remove(), MeterId.meterId(2));
+ TestTools.assertAfter(500, () -> {
+ assertEquals("The meter was not withdrawn", 1, manager.getAllMeters().size());
+ Meter pendingMeter = manager.getMeter(PROGRAMMABLE_DID, MeterId.meterId(2));
+ assertThat(pendingMeter, is(mProgrammable2));
+ assertEquals("incorrect state", MeterState.PENDING_REMOVE, pendingMeter.state());
+ });
+
+ TestTools.assertAfter(2000, () -> {
+ assertEquals("The meter was not removed", 0, manager.getAllMeters().size());
+ Meter pendingMeter = manager.getMeter(PROGRAMMABLE_DID, MeterId.meterId(2));
+ assertNull(pendingMeter);
+ });
+ }
+
// Test cluster service
private final class TestClusterService extends ClusterServiceAdapter {
@@ -563,6 +810,8 @@
public static class TestMeterProgrammable extends AbstractHandlerBehaviour
implements MeterProgrammable {
+ private static boolean unknownMeter;
+
@Override
public CompletableFuture<Boolean> performMeterOperation(MeterOperation meterOp) {
return CompletableFuture.completedFuture(meterOperations.add(meterOp));
@@ -571,9 +820,16 @@
@Override
public CompletableFuture<Collection<Meter>> getMeters() {
//ADD METER
+ Collection<Meter> meters = Lists.newArrayList();
DefaultMeter mProgrammableAdded = (DefaultMeter) mProgrammable;
mProgrammableAdded.setState(MeterState.ADDED);
- return CompletableFuture.completedFuture(ImmutableList.of(mProgrammableAdded));
+ meters.add(mProgrammableAdded);
+ if (unknownMeter) {
+ DefaultMeter mProgrammable2Added = (DefaultMeter) mProgrammable2;
+ mProgrammable2Added.setState(MeterState.ADDED);
+ meters.add(mProgrammable2Added);
+ }
+ return CompletableFuture.completedFuture(meters);
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 7f7873c..afc6d94 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -83,6 +83,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.onosproject.store.meter.impl.DistributedMeterStore.ReuseStrategy.FIRST_FIT;
import static org.onosproject.net.meter.MeterFailReason.TIMEOUT;
import static org.onosproject.net.meter.MeterCellId.MeterCellType.INDEX;
@@ -115,7 +116,7 @@
// Meters id related objects
private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
// Available meter identifiers
- private ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
+ protected ConcurrentMap<MeterTableKey, DistributedSet<MeterKey>> availableMeterIds;
// Atomic counter map for generation of new identifiers;
private static final String METERIDSTORE = "onos-meters-id-store";
private AtomicCounterMap<MeterTableKey> meterIdGenerators;
@@ -130,7 +131,12 @@
.register(Band.Type.class)
.register(MeterState.class)
.register(Meter.Unit.class)
- .register(MeterFailReason.class);
+ .register(MeterFailReason.class)
+ .register(MeterTableKey.class)
+ .register(MeterFeatures.class)
+ .register(DefaultMeterFeatures.class)
+ .register(MeterFeaturesFlag.class)
+ .register(MeterScope.class);
private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -143,6 +149,9 @@
private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
Maps.newConcurrentMap();
+ // Control the user defined index mode for the store.
+ protected boolean userDefinedIndexMode = false;
+
/**
* Defines possible selection strategies to reuse meter ids.
*/
@@ -170,16 +179,7 @@
metersFeatures = storageService.<MeterTableKey, MeterFeatures>eventuallyConsistentMapBuilder()
.withName(METERFEATURESSTORE)
.withTimestampProvider((key, features) -> new WallClockTimestamp())
- .withSerializer(KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(MeterTableKey.class)
- .register(MeterFeatures.class)
- .register(DefaultMeterFeatures.class)
- .register(DefaultBand.class)
- .register(Band.Type.class)
- .register(Meter.Unit.class)
- .register(MeterFailReason.class)
- .register(MeterFeaturesFlag.class)).build();
+ .withSerializer(APP_KRYO_BUILDER).build();
metersFeatures.addListener(featuresMapListener);
// Init the map of the available ids set
// Set will be created when a new Meter Features is pushed to the store
@@ -190,6 +190,7 @@
.withSerializer(Serializer.using(KryoNamespaces.API,
MeterTableKey.class,
MeterScope.class)).build();
+
log.info("Started");
}
@@ -199,15 +200,15 @@
metersFeatures.removeListener(featuresMapListener);
meters.destroy();
metersFeatures.destroy();
- availableMeterIds.forEach((key, set) -> {
- set.destroy();
- });
+ availableMeterIds.forEach((key, set) -> set.destroy());
+
log.info("Stopped");
}
@Override
public CompletableFuture<MeterStoreResult> addOrUpdateMeter(Meter meter) {
- // Init steps
+ // Verify integrity of the index
+ checkArgument(validIndex(meter), "Meter index is not valid");
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
MeterData data = new MeterData(meter, null);
@@ -227,23 +228,7 @@
@Override
public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
- // Init steps
- CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
- MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
- // Store the future related to the operation
- futures.put(key, future);
- // Store the meter data
- MeterData data = new MeterData(meter, null);
- try {
- meters.put(key, data);
- } catch (StorageException e) {
- log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
- e.getMessage(), e);
- futures.remove(key);
- future.completeExceptionally(e);
- }
- // Done, return the future
- return future;
+ return addOrUpdateMeter(meter);
}
@Override
@@ -309,9 +294,7 @@
Set<MeterTableKey> keys = metersFeatures.keySet().stream()
.filter(key -> key.deviceId().equals(deviceId))
.collect(Collectors.toUnmodifiableSet());
- keys.forEach(k -> {
- metersFeatures.remove(k);
- });
+ keys.forEach(k -> metersFeatures.remove(k));
} catch (StorageException e) {
log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
e.getMessage(), e);
@@ -342,27 +325,6 @@
}
@Override
- // TODO Should we remove it ? We are not using it
- public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
- CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
- MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
- futures.put(key, future);
-
- MeterData data = new MeterData(meter, null);
- try {
- if (meters.computeIfPresent(key, (k, v) -> data) == null) {
- future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
- }
- } catch (StorageException e) {
- log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
- e.getMessage(), e);
- futures.remove(key);
- future.completeExceptionally(e);
- }
- return future;
- }
-
- @Override
public Meter updateMeterState(Meter meter) {
// Update meter if present (stats workflow)
MeterKey key = MeterKey.key(meter.deviceId(), meter.meterCellId());
@@ -405,7 +367,7 @@
@Override
public void failedMeter(MeterOperation op, MeterFailReason reason) {
// Meter ops failed (got notification from the sb)
- MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
+ MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().meterCellId());
meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
}
@@ -419,7 +381,7 @@
public void purgeMeter(Meter m) {
// Once we receive the ack from the sb
// create the key and remove definitely the meter
- MeterKey key = MeterKey.key(m.deviceId(), m.id());
+ MeterKey key = MeterKey.key(m.deviceId(), m.meterCellId());
try {
if (Versioned.valueOrNull(meters.remove(key)) != null) {
// Free the id
@@ -441,6 +403,12 @@
@Override
public void purgeMeter(DeviceId deviceId) {
+ // This method is renamed in onos-2.5
+ purgeMeters(deviceId);
+ }
+
+ @Override
+ public void purgeMeters(DeviceId deviceId) {
List<Versioned<MeterData>> metersPendingRemove = meters.stream()
.filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
.map(Map.Entry::getValue)
@@ -456,7 +424,19 @@
e.getValue().value().meter().appId().equals(appId))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
- metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+ metersPendingRemove.forEach(versionedMeterKey
+ -> purgeMeter(versionedMeterKey.value().meter()));
+ }
+
+ @Override
+ public boolean userDefinedIndexMode(boolean enable) {
+ if (meters.isEmpty() && meterIdGenerators.isEmpty()) {
+ userDefinedIndexMode = enable;
+ } else {
+ log.warn("Unable to {} user defined index mode as store did" +
+ "already some allocations", enable ? "activate" : "deactivate");
+ }
+ return userDefinedIndexMode;
}
@Override
@@ -473,6 +453,28 @@
return features == null ? 0L : features.maxMeter();
}
+ private boolean validIndex(Meter meter) {
+ long index;
+ MeterTableKey key;
+
+ if (meter.meterCellId().type() == PIPELINE_INDEPENDENT) {
+ PiMeterCellId piMeterCellId = (PiMeterCellId) meter.meterCellId();
+ index = piMeterCellId.index();
+ key = MeterTableKey.key(meter.deviceId(), MeterScope.of(piMeterCellId.meterId().id()));
+ } else if (meter.meterCellId().type() == INDEX) {
+ MeterId meterId = (MeterId) meter.meterCellId();
+ index = meterId.id();
+ key = MeterTableKey.key(meter.deviceId(), MeterScope.globalScope());
+ } else {
+ return false;
+ }
+
+ MeterFeatures features = metersFeatures.get(key);
+ long startIndex = features == null ? -1L : features.startIndex();
+ long endIndex = features == null ? -1L : features.endIndex();
+ return index >= startIndex && index <= endIndex;
+ }
+
private long getStartIndex(MeterTableKey key) {
// Leverage the meter features to know the start id
// Since we are using index now
@@ -556,7 +558,7 @@
Set<MeterCellId> localAvailableMeterIds = keySet.stream()
.filter(meterKey ->
meterKey.deviceId().equals(meterTableKey.deviceId()))
- .map(MeterKey::meterId)
+ .map(MeterKey::meterCellId)
.collect(Collectors.toSet());
// Get next available id
MeterCellId meterId = getNextAvailableId(localAvailableMeterIds);
@@ -584,6 +586,10 @@
@Override
public MeterCellId allocateMeterId(DeviceId deviceId, MeterScope meterScope) {
+ if (userDefinedIndexMode) {
+ log.warn("Unable to allocate meter id when user defined index mode is enabled");
+ return null;
+ }
MeterTableKey meterTableKey = MeterTableKey.key(deviceId, meterScope);
MeterCellId meterCellId;
long id;
@@ -638,6 +644,10 @@
}
private void freeMeterId(MeterTableKey meterTableKey, MeterCellId meterCellId) {
+ if (userDefinedIndexMode) {
+ log.warn("Unable to free meter id when user defined index mode is enabled");
+ return;
+ }
long index;
if (meterCellId.type() == PIPELINE_INDEPENDENT) {
PiMeterCellId piMeterCellId = (PiMeterCellId) meterCellId;
diff --git a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
index 0527755..37ff7b4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
@@ -21,10 +21,8 @@
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
-import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.TestApplicationId;
-import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.MeterQuery;
import org.onosproject.net.driver.Behaviour;
@@ -37,11 +35,16 @@
import org.onosproject.net.meter.DefaultMeter;
import org.onosproject.net.meter.DefaultMeterFeatures;
import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterCellId;
import org.onosproject.net.meter.MeterFeatures;
import org.onosproject.net.meter.MeterFeaturesKey;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterKey;
+import org.onosproject.net.meter.MeterScope;
import org.onosproject.net.meter.MeterState;
+import org.onosproject.net.meter.MeterTableKey;
+import org.onosproject.net.pi.model.PiMeterId;
+import org.onosproject.net.pi.runtime.PiMeterCellId;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TestStorageService;
@@ -61,13 +64,6 @@
* Meter store tests.
*/
public class DistributedMeterStoreTest {
-
- // Test node id
- private static final NodeId NID_LOCAL = new NodeId("local");
-
- // Test ip address
- private static final IpAddress LOCALHOST = IpAddress.valueOf("127.0.0.1");
-
// Store under testing
private DistributedMeterStore meterStore;
@@ -82,6 +78,10 @@
private MeterId mid2 = MeterId.meterId(2);
private MeterId mid3 = MeterId.meterId(3);
private MeterId mid10 = MeterId.meterId(10);
+ private MeterCellId cid4 = PiMeterCellId.ofIndirect(
+ PiMeterId.of("foo"), 4);
+ private MeterCellId invalidCid = PiMeterCellId.ofIndirect(
+ PiMeterId.of("foo"), 11);
// Bands used during the tests
private Band b1 = DefaultBand.builder()
@@ -114,6 +114,14 @@
.withBands(Collections.singletonList(b1))
.build();
+ private Meter m4 = DefaultMeter.builder()
+ .forDevice(did3)
+ .fromApp(APP_ID)
+ .withCellId(cid4)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+
// Meter features used during the tests
private MeterFeatures mef1 = DefaultMeterFeatures.builder().forDevice(did1)
.withMaxMeters(3L)
@@ -133,6 +141,17 @@
.withMaxBands((byte) 0)
.withMaxColors((byte) 0)
.build();
+ private MeterFeatures mef3 = DefaultMeterFeatures.builder().forDevice(did3)
+ .withStartIndex(0L)
+ .withEndIndex(10L)
+ .withScope(MeterScope.of("foo"))
+ .withBandTypes(new HashSet<>())
+ .withUnits(new HashSet<>())
+ .hasStats(false)
+ .hasBurst(false)
+ .withMaxBands((byte) 0)
+ .withMaxColors((byte) 0)
+ .build();
@Before
public void setup() {
@@ -158,11 +177,13 @@
meterStore.deactivate();
}
- private void initMeterStore() {
+ private void initMeterStore(boolean enableUserDefinedIndex) {
+ meterStore.userDefinedIndexMode(enableUserDefinedIndex);
// Let's store feature for device 1
meterStore.storeMeterFeatures(mef1);
// Let's store feature for device 2
meterStore.storeMeterFeatures(mef2);
+ meterStore.storeMeterFeatures(mef3);
}
/**
@@ -201,7 +222,7 @@
@Test
public void testAllocateId() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Allocate a meter id and verify is equal to mid1
assertThat(mid1, is(meterStore.allocateMeterId(did1)));
// Allocate a meter id and verify is equal to mid2
@@ -214,7 +235,7 @@
@Test
public void testFreeId() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Allocate a meter id and verify is equal to mid1
assertThat(mid1, is(meterStore.allocateMeterId(did1)));
// Free the above id
@@ -233,7 +254,7 @@
@Test
public void testReuseId() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Reserve id 1
MeterId meterIdOne = meterStore.allocateMeterId(did2);
// Free the above id
@@ -291,7 +312,7 @@
@Test
public void testQueryMeters() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Let's test queryMeters
assertThat(mid1, is(meterStore.allocateMeterId(did3)));
// Let's test queryMeters error
@@ -304,7 +325,7 @@
@Test
public void testMaxMeterError() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Reserve id 1
assertThat(mid1, is(meterStore.allocateMeterId(did1)));
// Reserve id 2
@@ -319,7 +340,7 @@
@Test
public void testStoreMeter() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Simulate the allocation of an id
MeterId idOne = meterStore.allocateMeterId(did1);
// Verify the allocation
@@ -350,7 +371,7 @@
@Test
public void testDeleteMeter() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Simulate the allocation of an id
MeterId idOne = meterStore.allocateMeterId(did1);
// Verify the allocation
@@ -396,7 +417,7 @@
@Test
public void testNoDeleteMeter() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Simulate the allocation of an id
MeterId idOne = meterStore.allocateMeterId(did1);
// Create the key
@@ -440,7 +461,7 @@
@Test
public void testPurgeMeterDeviceAndApp() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// add the meters
((DefaultMeter) m1).setState(MeterState.PENDING_ADD);
((DefaultMeter) m2).setState(MeterState.PENDING_ADD);
@@ -465,7 +486,7 @@
@Test
public void testGetMetersImmutability() {
// Init the store
- initMeterStore();
+ initMeterStore(false);
// Simulate the allocation of an id
MeterId idOne = meterStore.allocateMeterId(did1);
@@ -513,8 +534,144 @@
metersDevice = meterStore.getAllMeters(did1);
assertThat(2, is(meters.size()));
assertThat(2, is(metersDevice.size()));
+ }
+ /**
+ * Test invalid allocation of a cell id.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidCellId() {
+ initMeterStore(true);
+ // MF defines an end index equals to 10
+ Meter meterBad = DefaultMeter.builder()
+ .forDevice(did3)
+ .fromApp(APP_ID)
+ .withCellId(invalidCid)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ ((DefaultMeter) meterBad).setState(MeterState.PENDING_ADD);
+ meterStore.storeMeter(meterBad);
+ }
+ /**
+ * Test enabling user defined index mode.
+ */
+ @Test
+ public void testEnableUserDefinedIndex() {
+ initMeterStore(false);
+ assertTrue(meterStore.userDefinedIndexMode(true));
+ }
+
+ /**
+ * Test invalid enabling user defined index mode.
+ */
+ @Test
+ public void testInvalidEnableUserDefinedIndex() {
+ testStoreMeter();
+ assertFalse(meterStore.userDefinedIndexMode(true));
+ }
+
+ /**
+ * Test disabling user defined index mode.
+ */
+ @Test
+ public void testDisableUserDefinedIndex() {
+ initMeterStore(true);
+ assertFalse(meterStore.userDefinedIndexMode(false));
+ }
+
+ /**
+ * Test store meter in user defined index mode.
+ */
+ @Test
+ public void testStoreMeterInUserDefinedIndexMode() {
+ initMeterStore(true);
+ // Let's create a meter
+ Meter meterOne = DefaultMeter.builder()
+ .forDevice(did3)
+ .fromApp(APP_ID)
+ .withCellId(cid4)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ // Set the state
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_ADD);
+ // Store the meter
+ meterStore.storeMeter(meterOne);
+ // Let's create meter key
+ MeterKey meterKey = MeterKey.key(did3, cid4);
+ // Verify the store
+ assertThat(1, is(meterStore.getAllMeters().size()));
+ assertThat(1, is(meterStore.getAllMeters(did3).size()));
+ assertThat(m4, is(meterStore.getMeter(meterKey)));
+ }
+
+ /**
+ * Test invalid disabling user defined index mode.
+ */
+ @Test
+ public void testInvalidDisableUserDefinedIndex() {
+ testStoreMeterInUserDefinedIndexMode();
+ assertTrue(meterStore.userDefinedIndexMode(false));
+ }
+
+ /**
+ * Test allocation of meter ids in user defined index mode.
+ */
+ @Test
+ public void testAllocateIdInUserDefinedIndexMode() {
+ initMeterStore(true);
+ assertNull(meterStore.allocateMeterId(did1));
+ }
+
+ /**
+ * Test free of meter ids in user defined index mode.
+ */
+ @Test
+ public void testFreeIdInUserMode() {
+ initMeterStore(true);
+ // Free the id and expect not being available
+ meterStore.freeMeterId(did1, mid1);
+ MeterTableKey globalKey = MeterTableKey.key(did1, MeterScope.globalScope());
+ assertNotNull(meterStore.availableMeterIds.get(globalKey));
+ assertTrue(meterStore.availableMeterIds.get(globalKey).isEmpty());
+ }
+
+ /**
+ * Test delete meter in user defined index mode.
+ */
+ @Test
+ public void testDeleteMeterInUserDefinedIndexMode() {
+ initMeterStore(true);
+ Meter meterOne = DefaultMeter.builder()
+ .forDevice(did3)
+ .fromApp(APP_ID)
+ .withCellId(cid4)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_ADD);
+ meterStore.storeMeter(meterOne);
+
+ ((DefaultMeter) meterOne).setState(MeterState.PENDING_REMOVE);
+ MeterKey meterKey = MeterKey.key(did3, cid4);
+ meterStore.deleteMeter(meterOne);
+ CompletableFuture<Void> future = CompletableFuture.runAsync(
+ () -> meterStore.purgeMeter(meterOne)
+ );
+
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ assertThat(0, is(meterStore.getAllMeters().size()));
+ assertThat(0, is(meterStore.getAllMeters(did3).size()));
+ assertNull(meterStore.getMeter(meterKey));
+ MeterTableKey globalKey = MeterTableKey.key(did1, MeterScope.globalScope());
+ assertNotNull(meterStore.availableMeterIds.get(globalKey));
+ assertTrue(meterStore.availableMeterIds.get(globalKey).isEmpty());
}
// Test class for driver service.