[AETHER-432][VOL-3263] Revisit meters subsystem to be fully distributed.
- Events are now propagated across the cluster.
- REF_COUNT_ZERO is now generated after the first stats interval.
- SB ops are offloaded to the installers.
- Installers use predictable executors.
- Deprecated MeterData API that exposes origin node and removed unused code.
- Comments are added to parts of the code that are obscure.
- MeterManager and DistributedMeterStore unit tests are improved.
- Fix an issue in TestConsistentMap.
Change-Id: I0329f903e5fdc421f29ee33f8f8133f18c794d8f
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java
index aa08c8d..84d1734 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java
@@ -28,6 +28,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.onosproject.store.primitives.ConsistentMapBackedJavaMap;
@@ -278,6 +279,11 @@
}
@Override
+ public Stream<Map.Entry<K, Versioned<V>>> stream() {
+ return map.entrySet().stream();
+ }
+
+ @Override
public void addListener(MapEventListener<K, V> listener) {
listeners.add(listener);
}
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 1979c61..49a889d 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -15,9 +15,13 @@
*/
package org.onosproject.net.meter.impl;
+import org.onlab.util.PredictableExecutor;
+import org.onlab.util.PredictableExecutor.PickyRunnable;
import org.onlab.util.Tools;
import org.onlab.util.TriConsumer;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.NetworkConfigRegistry;
@@ -59,12 +63,12 @@
import java.util.Collection;
import java.util.Dictionary;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
+import java.util.Objects;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.PredictableExecutor.newPredictableExecutor;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.OsgiPropertyConstants.MM_FALLBACK_METER_POLL_FREQUENCY;
@@ -93,10 +97,11 @@
public class MeterManager
extends AbstractListenerProviderRegistry<MeterEvent, MeterListener, MeterProvider, MeterProviderService>
implements MeterService, MeterProviderRegistry {
-
+ // Installer related objects
+ private PredictableExecutor meterInstallers;
private static final String WORKER_PATTERN = "installer-%d";
private static final String GROUP_THREAD_NAME = "onos/meter";
-
+ // Logging facility, meter store delegate and listener for device events.
private final Logger log = getLogger(getClass());
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
private final DeviceListener deviceListener = new InternalDeviceListener();
@@ -119,7 +124,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected NetworkConfigRegistry netCfgService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
/** Number of worker threads. */
+ // TODO Set 0 to use the available processors
private int numThreads = MM_NUM_THREADS_DEFAULT;
/** Frequency (in seconds) for polling meters via fallback provider. */
@@ -128,19 +137,28 @@
/** Purge entries associated with a device when the device goes offline. */
private boolean purgeOnDisconnection = MM_PURGE_ON_DISCONNECTION_DEFAULT;
+ // Action triggered when the futures related to submit and withdrawal complete
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
- private ExecutorService executorService;
-
+ // Meter provider reference
private final MeterDriverProvider defaultProvider = new MeterDriverProvider();
+ // Node id used to verify who is charge of the meter ops
+ // (usually one node can modify the internal state of the device)
+ private NodeId local;
+
@Activate
public void activate(ComponentContext context) {
store.setDelegate(delegate);
cfgService.registerProperties(getClass());
eventDispatcher.addSink(MeterEvent.class, listenerRegistry);
deviceService.addListener(deviceListener);
-
+ local = clusterService.getLocalNode().id();
+ // Consumer logic is the following:
+ // if there is an exceptional end (storage exception), on error is called
+ // else if there is a reason for the failure, on error is called with the reason
+ // else if the reason is empty, on success is called
+ // In all the cases the meter context code is consumed
onComplete = (request, result, error) -> {
request.context().ifPresent(c -> {
if (error != null) {
@@ -153,12 +171,9 @@
}
}
});
-
};
modified(context);
- executorService = newFixedThreadPool(numThreads,
- groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
log.info("Started");
}
@@ -178,7 +193,7 @@
eventDispatcher.removeSink(MeterEvent.class);
deviceService.removeListener(deviceListener);
cfgService.unregisterProperties(getClass(), false);
- executorService.shutdown();
+ meterInstallers.shutdown();
log.info("Stopped");
}
@@ -215,6 +230,11 @@
} catch (NumberFormatException e) {
numThreads = MM_NUM_THREADS_DEFAULT;
}
+ if (meterInstallers != null) {
+ meterInstallers.shutdown();
+ }
+ meterInstallers = newPredictableExecutor(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
}
@Override
@@ -230,19 +250,20 @@
@Override
public Meter submit(MeterRequest request) {
checkNotNull(request, "request cannot be null.");
+ // Allocate an id and then submit the request
MeterId id = allocateMeterId(request.deviceId());
-
Meter.Builder mBuilder = DefaultMeter.builder()
.forDevice(request.deviceId())
.fromApp(request.appId())
.withBands(request.bands())
.withCellId(id)
.withUnit(request.unit());
-
if (request.isBurst()) {
mBuilder.burst();
}
DefaultMeter m = (DefaultMeter) mBuilder.build();
+ // Meter installation logic (happy ending case)
+ // PENDING -> stats -> ADDED -> future completes
m.setState(MeterState.PENDING_ADD);
store.storeMeter(m).whenComplete((result, error) ->
onComplete.accept(request, result, error));
@@ -262,8 +283,10 @@
if (request.isBurst()) {
mBuilder.burst();
}
-
DefaultMeter m = (DefaultMeter) mBuilder.build();
+ // Meter removal logic (happy ending case)
+ // PENDING -> stats -> removed from the map -> future completes
+ // There is no transition to the REMOVED state
m.setState(MeterState.PENDING_REMOVE);
store.deleteMeter(m).whenComplete((result, error) ->
onComplete.accept(request, result, error));
@@ -288,19 +311,19 @@
@Override
public MeterId allocateMeterId(DeviceId deviceId) {
- // We delegate direclty to the store
+ // We delegate directly to the store
return store.allocateMeterId(deviceId);
}
@Override
public void freeMeterId(DeviceId deviceId, MeterId meterId) {
- // We delegate direclty to the store
+ // We delegate directly to the store
store.freeMeterId(deviceId, meterId);
}
@Override
public void purgeMeters(DeviceId deviceId) {
- // We delegate direclty to the store
+ // We delegate directly to the store
store.purgeMeter(deviceId);
}
@@ -325,6 +348,7 @@
@Override
public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
+ // Each update on the store is reflected on this collection
Collection<Meter> allMeters = store.getAllMeters(deviceId);
Map<MeterId, Meter> meterEntriesMap = meterEntries.stream()
@@ -337,8 +361,8 @@
!meterEntriesMap.containsKey(m.id())) {
// The meter is missing in the device. Reinstall!
log.debug("Adding meter missing in device {} {}", deviceId, m);
- provider().performMeterOperation(deviceId,
- new MeterOperation(m, MeterOperation.Type.ADD));
+ // offload the task to avoid the overloading of the sb threads
+ meterInstallers.execute(new MeterInstaller(deviceId, m, MeterOperation.Type.ADD));
}
});
@@ -347,7 +371,7 @@
.filter(md -> !allMeters.stream().anyMatch(m -> m.id().equals(md.getKey())))
.forEach(mio -> {
Meter meter = mio.getValue();
- // FIXME: Removing a meter is meaningfull for OpenFlow, but not for P4Runtime.
+ // FIXME: Removing a meter is meaningful for OpenFlow, but not for P4Runtime.
// In P4Runtime meter cells cannot be removed. For the
// moment, we make the distinction between OpenFlow and
// P4Runtime by looking at the MeterCellType (always
@@ -355,21 +379,29 @@
if (meter.meterCellId().type() == MeterCellType.INDEX) {
// The meter is missing in onos. Uninstall!
log.debug("Remove meter in device not in onos {} {}", deviceId, mio.getKey());
- provider().performMeterOperation(deviceId,
- new MeterOperation(meter, MeterOperation.Type.REMOVE));
+ // offload the task to avoid the overloading of the sb threads
+ meterInstallers.execute(new MeterInstaller(deviceId, meter, MeterOperation.Type.REMOVE));
}
});
+ // Update the meter stats in the store (first time move the state from pending to added)
meterEntries.stream()
.filter(m -> allMeters.stream()
.anyMatch(sm -> sm.deviceId().equals(deviceId) && sm.id().equals(m.id())))
.forEach(m -> store.updateMeterState(m));
allMeters.forEach(m -> {
+ // FIXME: Installing a meter is meaningful for OpenFlow, but not for P4Runtime.
+ // It looks like this flow is used only for p4runtime to emulate the installation
+ // since meters are already instantiated - we need just modify the params.
if (m.state() == MeterState.PENDING_ADD) {
- provider().performMeterOperation(m.deviceId(),
- new MeterOperation(m,
- MeterOperation.Type.MODIFY));
+ // offload the task to avoid the overloading of the sb threads
+ meterInstallers.execute(new MeterInstaller(m.deviceId(), m, MeterOperation.Type.MODIFY));
+ // Remove workflow. Regarding OpenFlow, meters have been removed from
+ // the device but they are still in the store, we will purge them definitely.
+ // Instead, P4Runtime devices will not remove the meter. The first workaround
+ // for P4Runtime will avoid to send a remove op. Then, we reach this point
+ // and we purge the meter from the store
} else if (m.state() == MeterState.PENDING_REMOVE) {
store.deleteMeterNow(m);
}
@@ -393,25 +425,25 @@
public void notify(MeterEvent event) {
DeviceId deviceId = event.subject().deviceId();
switch (event.type()) {
+ // REQ events will trigger a modification in the device.
+ // Mastership check is performed inside the installer
+ // to avoid the blocking of the RAFT threads
case METER_ADD_REQ:
- executorService.execute(new MeterInstaller(deviceId, event.subject(),
+ meterInstallers.execute(new MeterInstaller(deviceId, event.subject(),
MeterOperation.Type.ADD));
break;
case METER_REM_REQ:
- executorService.execute(new MeterInstaller(deviceId, event.subject(),
+ meterInstallers.execute(new MeterInstaller(deviceId, event.subject(),
MeterOperation.Type.REMOVE));
break;
+ // Following events are forwarded to the apps subscribed for the meter events;
+ // installers are not involved in this task. In this case, the overhead for this op
+ // is almost null. Potentially we can introduce a store delegate thread.
case METER_ADDED:
- log.info("Meter added {}", event.subject());
- post(new MeterEvent(MeterEvent.Type.METER_ADDED, event.subject()));
- break;
case METER_REMOVED:
- log.info("Meter removed {}", event.subject());
- post(new MeterEvent(MeterEvent.Type.METER_REMOVED, event.subject()));
- break;
case METER_REFERENCE_COUNT_ZERO:
- log.debug("Meter reference count zero {}", event.subject());
- post(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO, event.subject()));
+ log.debug("Post {} event {}", event.type(), event.subject());
+ post(event);
break;
default:
log.warn("Unknown meter event {}", event.type());
@@ -419,10 +451,11 @@
}
}
+
/**
* Task that passes the meter down to the provider.
*/
- private class MeterInstaller implements Runnable {
+ private class MeterInstaller implements PickyRunnable {
private final DeviceId deviceId;
private final Meter meter;
private final MeterOperation.Type op;
@@ -435,6 +468,14 @@
@Override
public void run() {
+ // Check mastership and eventually execute the op on the device
+ log.debug("Meter {} request {}", op.name().toLowerCase(), meter);
+ NodeId master = mastershipService.getMasterFor(meter.deviceId());
+ if (!Objects.equals(local, master)) {
+ log.trace("Not the master of device {}, skipping installation of the meter {}",
+ meter.deviceId(), meter.id());
+ return;
+ }
MeterProvider p = getProvider(this.deviceId);
if (p == null) {
log.error("Unable to recover {}'s provider", deviceId);
@@ -442,6 +483,11 @@
}
p.performMeterOperation(deviceId, new MeterOperation(meter, op));
}
+
+ @Override
+ public int hint() {
+ return meter.id().hashCode();
+ }
}
private class InternalDeviceListener implements DeviceListener {
diff --git a/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java b/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
index 030b9e6..52970dd 100644
--- a/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/meter/impl/MeterManagerTest.java
@@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -73,6 +74,7 @@
import org.onosproject.store.meter.impl.DistributedMeterStore;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TestStorageService;
+import org.osgi.service.component.ComponentContext;
import java.util.ArrayList;
import java.util.Collection;
@@ -80,8 +82,12 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.Dictionary;
+import java.util.Hashtable;
import java.util.concurrent.CompletableFuture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -115,7 +121,6 @@
new DefaultDevice(PROGRAMMABLE_PROVIDER, PROGRAMMABLE_DID, Device.Type.SWITCH,
"", "", "", "", null, ANNOTATIONS);
-
private MeterService service;
// Test Driver service used during the tests
@@ -232,8 +237,6 @@
meterStore = new DistributedMeterStore();
// Let's initialize some internal services of the store
TestUtils.setField(meterStore, "storageService", new TestStorageService());
- TestUtils.setField(meterStore, "clusterService", new TestClusterService());
- TestUtils.setField(meterStore, "mastershipService", new TestMastershipService());
TestUtils.setField(meterStore, "driverService", driverService);
// Inject TestApplicationId into the DistributedMeterStore serializer
@@ -252,16 +255,20 @@
manager.deviceService = deviceService;
manager.mastershipService = new TestMastershipService();
manager.cfgService = new ComponentConfigAdapter();
- TestUtils.setField(manager, "storageService", new TestStorageService());
+ manager.clusterService = new TestClusterService();
// Init the reference of the registry
registry = manager;
manager.driverService = driverService;
// Activate the manager
- manager.activate(null);
- // Initialize the test provider
+ Dictionary<String, Object> cfgDict = new Hashtable<>();
+ ComponentContext componentContext = EasyMock.createMock(ComponentContext.class);
+ expect(componentContext.getProperties()).andReturn(cfgDict);
+ replay(componentContext);
+ manager.activate(componentContext);
+ // Initialize the test provider
provider = new TestProvider(PID);
// Register the provider against the manager
providerService = registry.register(provider);
@@ -315,8 +322,8 @@
// Submit meter request
manager.submit(m1Request.add());
// Verify add
- assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
- assertTrue("The meter was not added", manager.getMeters(did1).size() == 1);
+ assertEquals("The meter was not added", 1, manager.getAllMeters().size());
+ assertEquals("The meter was not added", 1, manager.getMeters(did1).size());
// Get Meter
Meter installingMeter = manager.getMeter(did1, mid1);
// Verify add of installingMeter and pending add state
@@ -329,8 +336,8 @@
Meter installedMeter = manager.getMeter(did1, mid1);
// Verify installation
assertThat(installedMeter.state(), is(MeterState.ADDED));
- assertTrue("The meter was not installed", manager.getAllMeters().size() == 1);
- assertTrue("The meter was not installed", manager.getMeters(did1).size() == 1);
+ assertEquals("The meter was not installed", 1, manager.getAllMeters().size());
+ assertEquals("The meter was not installed", 1, manager.getMeters(did1).size());
}
/**
@@ -348,14 +355,14 @@
Meter withdrawingMeter = manager.getMeter(did1, mid1);
// Verify withdrawing
assertThat(withdrawingMeter.state(), is(MeterState.PENDING_REMOVE));
- assertTrue("The meter was not withdrawn", manager.getAllMeters().size() == 1);
- assertTrue("The meter was not withdrawn", manager.getMeters(did1).size() == 1);
+ assertEquals("The meter was not withdrawn", 1, manager.getAllMeters().size());
+ assertEquals("The meter was not withdrawn", 1, manager.getMeters(did1).size());
// Let's simulate a working data-plane
pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter);
// Verify withdrawn
assertNull(manager.getMeter(did1, mid1));
- assertTrue("The meter was not removed", manager.getAllMeters().size() == 0);
- assertTrue("The meter was not removed", manager.getMeters(did1).size() == 0);
+ assertEquals("The meter was not removed", 0, manager.getAllMeters().size());
+ assertEquals("The meter was not removed", 0, manager.getMeters(did1).size());
}
/**
@@ -370,9 +377,9 @@
// Submit meter 2
manager.submit(m2Request.add());
// Verify add
- assertTrue("The meter was not added", manager.getAllMeters().size() == 2);
- assertTrue("The meter was not added", manager.getMeters(did1).size() == 1);
- assertTrue("The meter was not added", manager.getMeters(did2).size() == 1);
+ assertEquals("The meter was not added", 2, manager.getAllMeters().size());
+ assertEquals("The meter was not added", 1, manager.getMeters(did1).size());
+ assertEquals("The meter was not added", 1, manager.getMeters(did2).size());
// Get Meters
Meter installingMeter1 = manager.getMeter(did1, mid1);
Meter installingMeter2 = manager.getMeter(did2, mid1);
@@ -391,9 +398,9 @@
// Verify installation
assertThat(installedMeter1.state(), is(MeterState.ADDED));
assertThat(installedMeter2.state(), is(MeterState.ADDED));
- assertTrue("The meter was not installed", manager.getAllMeters().size() == 2);
- assertTrue("The meter was not installed", manager.getMeters(did1).size() == 1);
- assertTrue("The meter was not installed", manager.getMeters(did2).size() == 1);
+ assertEquals("The meter was not installed", 2, manager.getAllMeters().size());
+ assertEquals("The meter was not installed", 1, manager.getMeters(did1).size());
+ assertEquals("The meter was not installed", 1, manager.getMeters(did2).size());
}
/**
@@ -417,73 +424,78 @@
// Verify withdrawing
assertThat(withdrawingMeter1.state(), is(MeterState.PENDING_REMOVE));
assertThat(withdrawingMeter2.state(), is(MeterState.PENDING_REMOVE));
- assertTrue("The meter was not withdrawn", manager.getAllMeters().size() == 2);
- assertTrue("The meter was not withdrawn", manager.getMeters(did1).size() == 1);
- assertTrue("The meter was not withdrawn", manager.getMeters(did2).size() == 1);
+ assertEquals("The meter was not withdrawn", 2, manager.getAllMeters().size());
+ assertEquals("The meter was not withdrawn", 1, manager.getMeters(did1).size());
+ assertEquals("The meter was not withdrawn", 1, manager.getMeters(did2).size());
// Let's simulate a working data-plane
pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter1);
pushMetrics(MeterOperation.Type.REMOVE, withdrawingMeter2);
// Verify withdrawn
assertNull(manager.getMeter(did1, mid1));
assertNull(manager.getMeter(did2, mid1));
- assertTrue("The meter was not removed", manager.getAllMeters().size() == 0);
- assertTrue("The meter was not removed", manager.getMeters(did1).size() == 0);
- assertTrue("The meter was not removed", manager.getMeters(did2).size() == 0);
+ assertEquals("The meter was not removed", 0, manager.getAllMeters().size());
+ assertEquals("The meter was not removed", 0, manager.getMeters(did1).size());
+ assertEquals("The meter was not removed", 0, manager.getMeters(did2).size());
+ }
+
+ /**
+ * Test purge meter.
+ */
+ @Test
+ public void testPurge() {
+ // Init store
+ initMeterStore();
+ // Submit meter request
+ manager.submit(m1Request.add());
+ // Verify submit
+ Meter submittingMeter = manager.getMeter(did1, mid1);
+ assertThat(submittingMeter.state(), is(MeterState.PENDING_ADD));
+ assertEquals("The meter was not added", 1, manager.getAllMeters().size());
+ assertEquals("The meter was not added", 1, manager.getMeters(did1).size());
+ // Purge the meters
+ manager.purgeMeters(did1);
+ // Verify purge
+ assertNull(manager.getMeter(did1, mid1));
+ assertEquals("The meter was not purged", 0, manager.getAllMeters().size());
+ assertEquals("The meter was not purged", 0, manager.getMeters(did1).size());
}
@Test
public void testAddFromMeterProgrammable() {
-
// Init store
initMeterStore();
-
manager.submit(mProgrammableRequest.add());
-
TestTools.assertAfter(500, () -> {
-
- assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
-
+ assertEquals("The meter was not added", 1, manager.getAllMeters().size());
assertThat(manager.getMeter(PROGRAMMABLE_DID, MeterId.meterId(1)), is(mProgrammable));
});
-
}
@Test
public void testAddBatchFromMeterProgrammable() {
-
// Init store
initMeterStore();
-
List<MeterOperation> operations = ImmutableList.of(new MeterOperation(mProgrammable, MeterOperation.Type.ADD));
manager.defaultProvider().performMeterOperation(PROGRAMMABLE_DID, new MeterOperations(operations));
-
TestTools.assertAfter(500, () -> {
-
- assertTrue("The meter was not added", meterOperations.size() == 1);
-
- assertTrue("Wrong Meter Operation", meterOperations.get(0).meter().id().equals(mProgrammable.id()));
+ assertEquals("The meter was not added", 1, meterOperations.size());
+ assertEquals("Wrong Meter Operation", meterOperations.get(0).meter().id(), mProgrammable.id());
});
}
@Test
public void testGetFromMeterProgrammable() {
-
// Init store
initMeterStore();
-
MeterDriverProvider fallback = (MeterDriverProvider) manager.defaultProvider();
-
testAddFromMeterProgrammable();
-
fallback.init(manager.deviceService, fallback.meterProviderService, manager.mastershipService, 1);
-
TestTools.assertAfter(2000, () -> {
- assertTrue("The meter was not added", manager.getAllMeters().size() == 1);
+ assertEquals("The meter was not added", 1, manager.getAllMeters().size());
Meter m = manager.getMeters(PROGRAMMABLE_DID).iterator().next();
assertEquals("incorrect state", MeterState.ADDED, m.state());
});
-
}
// Test cluster service
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 0b4e2a4..9aec726 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -21,9 +21,6 @@
import com.google.common.collect.Maps;
import org.apache.commons.lang.math.RandomUtils;
import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.MeterQuery;
import org.onosproject.net.driver.DriverHandler;
@@ -87,11 +84,24 @@
private Logger log = getLogger(getClass());
+ // Meters map related objects
private static final String METERSTORE = "onos-meter-store";
- private static final String METERFEATURESSTORE = "onos-meter-features-store";
- private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
- private static final String METERIDSTORE = "onos-meters-id-store";
+ private ConsistentMap<MeterKey, MeterData> meters;
+ private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
+ // Meters features related objects
+ private static final String METERFEATURESSTORE = "onos-meter-features-store";
+ private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
+
+ // Meters id related objects
+ private static final String AVAILABLEMETERIDSTORE = "onos-meters-available-store";
+ // Available meter identifiers
+ private DistributedSet<MeterKey> availableMeterIds;
+ // Atomic counter map for generation of new identifiers;
+ private static final String METERIDSTORE = "onos-meters-id-store";
+ private AtomicCounterMap<DeviceId> meterIdGenerators;
+
+ // Serializer related objects
private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MeterKey.class)
@@ -102,37 +112,18 @@
.register(MeterState.class)
.register(Meter.Unit.class)
.register(MeterFailReason.class);
-
private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
@Reference(cardinality = ReferenceCardinality.MANDATORY)
private StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- private MastershipService mastershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- private ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DriverService driverService;
- private ConsistentMap<MeterKey, MeterData> meters;
- private NodeId local;
-
- private ConsistentMap<MeterFeaturesKey, MeterFeatures> meterFeatures;
-
- private MapEventListener<MeterKey, MeterData> mapListener = new InternalMapEventListener();
-
+ // Local cache to handle async ops through futures.
private Map<MeterKey, CompletableFuture<MeterStoreResult>> futures =
Maps.newConcurrentMap();
- // Available meter identifiers
- private DistributedSet<MeterKey> availableMeterIds;
-
- // Atomic counter map for generation of new identifiers;
- private AtomicCounterMap<DeviceId> meterIdGenerators;
-
/**
* Defines possible selection strategies to reuse meter ids.
*/
@@ -146,19 +137,16 @@
*/
FIRST_FIT
}
-
private ReuseStrategy reuseStrategy = FIRST_FIT;
@Activate
public void activate() {
- local = clusterService.getLocalNode().id();
-
+ // Init meters map and setup the map listener
meters = storageService.<MeterKey, MeterData>consistentMapBuilder()
.withName(METERSTORE)
.withSerializer(serializer).build();
-
meters.addListener(mapListener);
-
+ // Init meter features map (meaningful only for OpenFlow protocol)
meterFeatures = storageService.<MeterFeaturesKey, MeterFeatures>consistentMapBuilder()
.withName(METERFEATURESSTORE)
.withSerializer(Serializer.using(KryoNamespaces.API,
@@ -169,19 +157,16 @@
Meter.Unit.class,
MeterFailReason.class,
MeterFeaturesFlag.class)).build();
-
// Init the set of the available ids
availableMeterIds = new DefaultDistributedSet<>(storageService.<MeterKey>setBuilder()
.withName(AVAILABLEMETERIDSTORE)
.withSerializer(Serializer.using(KryoNamespaces.API,
MeterKey.class)).build(),
DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
-
// Init atomic map counters
meterIdGenerators = storageService.<DeviceId>atomicCounterMapBuilder()
.withName(METERIDSTORE)
.withSerializer(Serializer.using(KryoNamespaces.API)).build();
-
log.info("Started");
}
@@ -199,10 +184,12 @@
// Store the future related to the operation
futures.put(key, future);
// Store the meter data
- MeterData data = new MeterData(meter, null, local);
+ MeterData data = new MeterData(meter, null);
try {
meters.put(key, data);
} catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
futures.remove(key);
future.completeExceptionally(e);
}
@@ -218,7 +205,7 @@
// Store the future related to the operation
futures.put(key, future);
// Create the meter data
- MeterData data = new MeterData(meter, null, local);
+ MeterData data = new MeterData(meter, null);
// Update the state of the meter. It will be pruned by observing
// that it has been removed from the dataplane.
try {
@@ -228,6 +215,8 @@
future.complete(MeterStoreResult.success());
}
} catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
futures.remove(key);
future.completeExceptionally(e);
}
@@ -237,11 +226,14 @@
@Override
public MeterStoreResult storeMeterFeatures(MeterFeatures meterfeatures) {
+ // Store meter features, this is done once for each device
MeterStoreResult result = MeterStoreResult.success();
MeterFeaturesKey key = MeterFeaturesKey.key(meterfeatures.deviceId());
try {
meterFeatures.putIfAbsent(key, meterfeatures);
} catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
result = MeterStoreResult.fail(TIMEOUT);
}
return result;
@@ -249,28 +241,34 @@
@Override
public MeterStoreResult deleteMeterFeatures(DeviceId deviceId) {
+ // Remove meter features - these ops are meaningful only for OpenFlow
MeterStoreResult result = MeterStoreResult.success();
MeterFeaturesKey key = MeterFeaturesKey.key(deviceId);
try {
meterFeatures.remove(key);
} catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
result = MeterStoreResult.fail(TIMEOUT);
}
return result;
}
@Override
+ // TODO Should we remove it ? We are not using it
public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
futures.put(key, future);
- MeterData data = new MeterData(meter, null, local);
+ MeterData data = new MeterData(meter, null);
try {
if (meters.computeIfPresent(key, (k, v) -> data) == null) {
future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
}
} catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
futures.remove(key);
future.completeExceptionally(e);
}
@@ -279,6 +277,7 @@
@Override
public void updateMeterState(Meter meter) {
+ // Update meter if present (stats workflow)
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
meters.computeIfPresent(key, (k, v) -> {
DefaultMeter m = (DefaultMeter) v.meter();
@@ -291,10 +290,7 @@
m.setLife(meter.life());
// TODO: Prune if drops to zero.
m.setReferenceCount(meter.referenceCount());
- if (meter.referenceCount() == 0) {
- notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO, m));
- }
- return new MeterData(m, null, v.origin());
+ return new MeterData(m, null);
});
}
@@ -320,41 +316,42 @@
@Override
public void failedMeter(MeterOperation op, MeterFailReason reason) {
+ // Meter ops failed (got notification from the sb)
MeterKey key = MeterKey.key(op.meter().deviceId(), op.meter().id());
- meters.computeIfPresent(key, (k, v) ->
- new MeterData(v.meter(), reason, v.origin()));
+ meters.computeIfPresent(key, (k, v) -> new MeterData(v.meter(), reason));
}
@Override
public void deleteMeterNow(Meter m) {
- // Create the key
+ // Once we receive the ack from the sb
+ // create the key and remove definitely the meter
MeterKey key = MeterKey.key(m.deviceId(), m.id());
- // Remove the future
- futures.remove(key);
- // Remove the meter
- if (Versioned.valueOrNull(meters.remove(key)) != null) {
- // Free the id
- freeMeterId(m.deviceId(), m.id());
- // Finally notify the delegate
- notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, m));
+ try {
+ if (Versioned.valueOrNull(meters.remove(key)) != null) {
+ // Free the id
+ freeMeterId(m.deviceId(), m.id());
+ }
+ } catch (StorageException e) {
+ log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
+ e.getMessage(), e);
}
}
@Override
public void purgeMeter(DeviceId deviceId) {
-
+ // Purge api (typically used when the device is offline)
List<Versioned<MeterData>> metersPendingRemove = meters.stream()
.filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
-
+ // Remove definitely the meter
metersPendingRemove.forEach(versionedMeterKey
-> deleteMeterNow(versionedMeterKey.value().meter()));
-
}
@Override
public long getMaxMeters(MeterFeaturesKey key) {
+ // Leverage the meter features to know the max id
MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
return features == null ? 0L : features.maxMeter();
}
@@ -465,45 +462,45 @@
updateMeterIdAvailability(deviceId, meterId, true);
}
+ // Enabling the events distribution across the cluster
private class InternalMapEventListener implements MapEventListener<MeterKey, MeterData> {
@Override
public void event(MapEvent<MeterKey, MeterData> event) {
MeterKey key = event.key();
Versioned<MeterData> value = event.type() == MapEvent.Type.REMOVE ? event.oldValue() : event.newValue();
MeterData data = value.value();
- NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
+ MeterData oldData = Versioned.valueOrNull(event.oldValue());
switch (event.type()) {
case INSERT:
case UPDATE:
switch (data.meter().state()) {
case PENDING_ADD:
case PENDING_REMOVE:
- if (!data.reason().isPresent() && local.equals(master)) {
- notifyDelegate(
- new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
- MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
- data.meter()));
- } else if (data.reason().isPresent() && local.equals(data.origin())) {
- MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
- //TODO: No future -> no friend
- futures.get(key).complete(msr);
- }
- break;
- case ADDED:
- if (local.equals(data.origin()) &&
- (data.meter().state() == MeterState.PENDING_ADD
- || data.meter().state() == MeterState.ADDED)) {
+ // Two cases. If there is a reason, the meter operation failed.
+ // Otherwise, we are ready to install/remove through the delegate.
+ if (data.reason().isEmpty()) {
+ notifyDelegate(new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
+ MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ, data.meter()));
+ } else {
futures.computeIfPresent(key, (k, v) -> {
- v.complete(MeterStoreResult.success());
- notifyDelegate(
- new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
+ v.complete(MeterStoreResult.fail(data.reason().get()));
return null;
});
}
break;
- case REMOVED:
- if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
- futures.remove(key).complete(MeterStoreResult.success());
+ case ADDED:
+ // Transition from pending to installed
+ if (data.meter().state() == MeterState.ADDED &&
+ (oldData != null && oldData.meter().state() == MeterState.PENDING_ADD)) {
+ futures.computeIfPresent(key, (k, v) -> {
+ v.complete(MeterStoreResult.success());
+ return null;
+ });
+ notifyDelegate(new MeterEvent(MeterEvent.Type.METER_ADDED, data.meter()));
+ // Update stats case
+ } else if (data.meter().referenceCount() == 0) {
+ notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO,
+ data.meter()));
}
break;
default:
@@ -511,7 +508,13 @@
}
break;
case REMOVE:
- //Only happens at origin so we do not need to care.
+ // Meter removal case
+ futures.computeIfPresent(key, (k, v) -> {
+ v.complete(MeterStoreResult.success());
+ return null;
+ });
+ // Finally notify the delegate
+ notifyDelegate(new MeterEvent(MeterEvent.Type.METER_REMOVED, data.meter()));
break;
default:
log.warn("Unknown Map event type {}", event.type());
@@ -520,5 +523,4 @@
}
}
-
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/MeterData.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/MeterData.java
index 8f87c2c..f392122 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/MeterData.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/MeterData.java
@@ -30,12 +30,29 @@
private final Optional<MeterFailReason> reason;
private final NodeId origin;
+ /**
+ * Builds up a meter data.
+ * @param meter the meter
+ * @param reason the reason of the failure
+ * @param origin the node from which the request is originated
+ * @deprecated in ONOS 2.2
+ */
+ @Deprecated
public MeterData(Meter meter, MeterFailReason reason, NodeId origin) {
this.meter = meter;
this.reason = Optional.ofNullable(reason);
this.origin = origin;
}
+ /**
+ * Builds up a meter data.
+ * @param meter the meter
+ * @param reason the reason of the failure
+ */
+ public MeterData(Meter meter, MeterFailReason reason) {
+ this(meter, reason, null);
+ }
+
public Meter meter() {
return meter;
}
@@ -44,6 +61,12 @@
return this.reason;
}
+ /**
+ * Returns the origin node.
+ * @return the node id of the origin node
+ * @deprecated in ONOS 2.2
+ */
+ @Deprecated
public NodeId origin() {
return this.origin;
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
index 5005787..2566803 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
@@ -17,7 +17,6 @@
package org.onosproject.store.meter.impl;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -25,11 +24,7 @@
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.TestApplicationId;
-import org.onosproject.cluster.ClusterServiceAdapter;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.MeterQuery;
import org.onosproject.net.driver.Behaviour;
@@ -52,7 +47,6 @@
import java.util.Collections;
import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -127,8 +121,6 @@
meterStore = new DistributedMeterStore();
// Let's initialize some internal services
TestUtils.setField(meterStore, "storageService", new TestStorageService());
- TestUtils.setField(meterStore, "clusterService", new TestClusterService());
- TestUtils.setField(meterStore, "mastershipService", new TestMastershipService());
TestUtils.setField(meterStore, "driverService", new TestDriverService());
// Inject TestApplicationId into the DistributedMeterStore serializer
@@ -408,29 +400,19 @@
assertNull(meterStore.getMeter(keyOne));
}
- // Test cluster service
- private final class TestClusterService extends ClusterServiceAdapter {
-
- private ControllerNode local = new DefaultControllerNode(NID_LOCAL, LOCALHOST);
-
- @Override
- public ControllerNode getLocalNode() {
- return local;
- }
-
- @Override
- public Set<ControllerNode> getNodes() {
- return Sets.newHashSet();
- }
-
- }
-
- // Test mastership service
- private final class TestMastershipService extends MastershipServiceAdapter {
- @Override
- public NodeId getMasterFor(DeviceId deviceId) {
- return NID_LOCAL;
- }
+ /**
+ * Test purge meter.
+ */
+ @Test
+ public void testPurgeMeter() {
+ // add the meter
+ testStoreMeter();
+ meterStore.purgeMeter(did1);
+ // Verify delete
+ MeterKey keyOne = MeterKey.key(did1, mid1);
+ assertThat(0, is(meterStore.getAllMeters().size()));
+ assertThat(0, is(meterStore.getAllMeters(did1).size()));
+ assertNull(meterStore.getMeter(keyOne));
}
// Test class for driver service.