[AETHER-432][VOL-3263] Revisit meters subsystem to be fully distributed.
- Events are now propagated across the cluster.
- REF_COUNT_ZERO is now generated after the first stats interval.
- SB ops are offloaded to the installers.
- Installers use predictable executors.
- Deprecated MeterData API that exposes origin node and removed unused code.
- Comments are added to parts of the code that are obscure.
- MeterManager and DistributedMeterStore unit tests are improved.
- Fix an issue in TestConsistentMap.
Change-Id: I0329f903e5fdc421f29ee33f8f8133f18c794d8f
(cherry picked from commit 89c130115df20c80de446aac9c48e838468028d4)
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 1979c61..49a889d 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -15,9 +15,13 @@
*/
package org.onosproject.net.meter.impl;
+import org.onlab.util.PredictableExecutor;
+import org.onlab.util.PredictableExecutor.PickyRunnable;
import org.onlab.util.Tools;
import org.onlab.util.TriConsumer;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.NetworkConfigRegistry;
@@ -59,12 +63,12 @@
import java.util.Collection;
import java.util.Dictionary;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
+import java.util.Objects;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.PredictableExecutor.newPredictableExecutor;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.OsgiPropertyConstants.MM_FALLBACK_METER_POLL_FREQUENCY;
@@ -93,10 +97,11 @@
public class MeterManager
extends AbstractListenerProviderRegistry<MeterEvent, MeterListener, MeterProvider, MeterProviderService>
implements MeterService, MeterProviderRegistry {
-
+ // Installer related objects
+ private PredictableExecutor meterInstallers;
private static final String WORKER_PATTERN = "installer-%d";
private static final String GROUP_THREAD_NAME = "onos/meter";
-
+ // Logging facility, meter store delegate and listener for device events.
private final Logger log = getLogger(getClass());
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
private final DeviceListener deviceListener = new InternalDeviceListener();
@@ -119,7 +124,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected NetworkConfigRegistry netCfgService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
/** Number of worker threads. */
+ // TODO Set 0 to use the available processors
private int numThreads = MM_NUM_THREADS_DEFAULT;
/** Frequency (in seconds) for polling meters via fallback provider. */
@@ -128,19 +137,28 @@
/** Purge entries associated with a device when the device goes offline. */
private boolean purgeOnDisconnection = MM_PURGE_ON_DISCONNECTION_DEFAULT;
+ // Action triggered when the futures related to submit and withdrawal complete
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
- private ExecutorService executorService;
-
+ // Meter provider reference
private final MeterDriverProvider defaultProvider = new MeterDriverProvider();
+ // Node id used to verify who is charge of the meter ops
+ // (usually one node can modify the internal state of the device)
+ private NodeId local;
+
@Activate
public void activate(ComponentContext context) {
store.setDelegate(delegate);
cfgService.registerProperties(getClass());
eventDispatcher.addSink(MeterEvent.class, listenerRegistry);
deviceService.addListener(deviceListener);
-
+ local = clusterService.getLocalNode().id();
+ // Consumer logic is the following:
+ // if there is an exceptional end (storage exception), on error is called
+ // else if there is a reason for the failure, on error is called with the reason
+ // else if the reason is empty, on success is called
+ // In all the cases the meter context code is consumed
onComplete = (request, result, error) -> {
request.context().ifPresent(c -> {
if (error != null) {
@@ -153,12 +171,9 @@
}
}
});
-
};
modified(context);
- executorService = newFixedThreadPool(numThreads,
- groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
log.info("Started");
}
@@ -178,7 +193,7 @@
eventDispatcher.removeSink(MeterEvent.class);
deviceService.removeListener(deviceListener);
cfgService.unregisterProperties(getClass(), false);
- executorService.shutdown();
+ meterInstallers.shutdown();
log.info("Stopped");
}
@@ -215,6 +230,11 @@
} catch (NumberFormatException e) {
numThreads = MM_NUM_THREADS_DEFAULT;
}
+ if (meterInstallers != null) {
+ meterInstallers.shutdown();
+ }
+ meterInstallers = newPredictableExecutor(numThreads,
+ groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
}
@Override
@@ -230,19 +250,20 @@
@Override
public Meter submit(MeterRequest request) {
checkNotNull(request, "request cannot be null.");
+ // Allocate an id and then submit the request
MeterId id = allocateMeterId(request.deviceId());
-
Meter.Builder mBuilder = DefaultMeter.builder()
.forDevice(request.deviceId())
.fromApp(request.appId())
.withBands(request.bands())
.withCellId(id)
.withUnit(request.unit());
-
if (request.isBurst()) {
mBuilder.burst();
}
DefaultMeter m = (DefaultMeter) mBuilder.build();
+ // Meter installation logic (happy ending case)
+ // PENDING -> stats -> ADDED -> future completes
m.setState(MeterState.PENDING_ADD);
store.storeMeter(m).whenComplete((result, error) ->
onComplete.accept(request, result, error));
@@ -262,8 +283,10 @@
if (request.isBurst()) {
mBuilder.burst();
}
-
DefaultMeter m = (DefaultMeter) mBuilder.build();
+ // Meter removal logic (happy ending case)
+ // PENDING -> stats -> removed from the map -> future completes
+ // There is no transition to the REMOVED state
m.setState(MeterState.PENDING_REMOVE);
store.deleteMeter(m).whenComplete((result, error) ->
onComplete.accept(request, result, error));
@@ -288,19 +311,19 @@
@Override
public MeterId allocateMeterId(DeviceId deviceId) {
- // We delegate direclty to the store
+ // We delegate directly to the store
return store.allocateMeterId(deviceId);
}
@Override
public void freeMeterId(DeviceId deviceId, MeterId meterId) {
- // We delegate direclty to the store
+ // We delegate directly to the store
store.freeMeterId(deviceId, meterId);
}
@Override
public void purgeMeters(DeviceId deviceId) {
- // We delegate direclty to the store
+ // We delegate directly to the store
store.purgeMeter(deviceId);
}
@@ -325,6 +348,7 @@
@Override
public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
+ // Each update on the store is reflected on this collection
Collection<Meter> allMeters = store.getAllMeters(deviceId);
Map<MeterId, Meter> meterEntriesMap = meterEntries.stream()
@@ -337,8 +361,8 @@
!meterEntriesMap.containsKey(m.id())) {
// The meter is missing in the device. Reinstall!
log.debug("Adding meter missing in device {} {}", deviceId, m);
- provider().performMeterOperation(deviceId,
- new MeterOperation(m, MeterOperation.Type.ADD));
+ // offload the task to avoid the overloading of the sb threads
+ meterInstallers.execute(new MeterInstaller(deviceId, m, MeterOperation.Type.ADD));
}
});
@@ -347,7 +371,7 @@
.filter(md -> !allMeters.stream().anyMatch(m -> m.id().equals(md.getKey())))
.forEach(mio -> {
Meter meter = mio.getValue();
- // FIXME: Removing a meter is meaningfull for OpenFlow, but not for P4Runtime.
+ // FIXME: Removing a meter is meaningful for OpenFlow, but not for P4Runtime.
// In P4Runtime meter cells cannot be removed. For the
// moment, we make the distinction between OpenFlow and
// P4Runtime by looking at the MeterCellType (always
@@ -355,21 +379,29 @@
if (meter.meterCellId().type() == MeterCellType.INDEX) {
// The meter is missing in onos. Uninstall!
log.debug("Remove meter in device not in onos {} {}", deviceId, mio.getKey());
- provider().performMeterOperation(deviceId,
- new MeterOperation(meter, MeterOperation.Type.REMOVE));
+ // offload the task to avoid the overloading of the sb threads
+ meterInstallers.execute(new MeterInstaller(deviceId, meter, MeterOperation.Type.REMOVE));
}
});
+ // Update the meter stats in the store (first time move the state from pending to added)
meterEntries.stream()
.filter(m -> allMeters.stream()
.anyMatch(sm -> sm.deviceId().equals(deviceId) && sm.id().equals(m.id())))
.forEach(m -> store.updateMeterState(m));
allMeters.forEach(m -> {
+ // FIXME: Installing a meter is meaningful for OpenFlow, but not for P4Runtime.
+ // It looks like this flow is used only for p4runtime to emulate the installation
+ // since meters are already instantiated - we need just modify the params.
if (m.state() == MeterState.PENDING_ADD) {
- provider().performMeterOperation(m.deviceId(),
- new MeterOperation(m,
- MeterOperation.Type.MODIFY));
+ // offload the task to avoid the overloading of the sb threads
+ meterInstallers.execute(new MeterInstaller(m.deviceId(), m, MeterOperation.Type.MODIFY));
+ // Remove workflow. Regarding OpenFlow, meters have been removed from
+ // the device but they are still in the store, we will purge them definitely.
+ // Instead, P4Runtime devices will not remove the meter. The first workaround
+ // for P4Runtime will avoid to send a remove op. Then, we reach this point
+ // and we purge the meter from the store
} else if (m.state() == MeterState.PENDING_REMOVE) {
store.deleteMeterNow(m);
}
@@ -393,25 +425,25 @@
public void notify(MeterEvent event) {
DeviceId deviceId = event.subject().deviceId();
switch (event.type()) {
+ // REQ events will trigger a modification in the device.
+ // Mastership check is performed inside the installer
+ // to avoid the blocking of the RAFT threads
case METER_ADD_REQ:
- executorService.execute(new MeterInstaller(deviceId, event.subject(),
+ meterInstallers.execute(new MeterInstaller(deviceId, event.subject(),
MeterOperation.Type.ADD));
break;
case METER_REM_REQ:
- executorService.execute(new MeterInstaller(deviceId, event.subject(),
+ meterInstallers.execute(new MeterInstaller(deviceId, event.subject(),
MeterOperation.Type.REMOVE));
break;
+ // Following events are forwarded to the apps subscribed for the meter events;
+ // installers are not involved in this task. In this case, the overhead for this op
+ // is almost null. Potentially we can introduce a store delegate thread.
case METER_ADDED:
- log.info("Meter added {}", event.subject());
- post(new MeterEvent(MeterEvent.Type.METER_ADDED, event.subject()));
- break;
case METER_REMOVED:
- log.info("Meter removed {}", event.subject());
- post(new MeterEvent(MeterEvent.Type.METER_REMOVED, event.subject()));
- break;
case METER_REFERENCE_COUNT_ZERO:
- log.debug("Meter reference count zero {}", event.subject());
- post(new MeterEvent(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO, event.subject()));
+ log.debug("Post {} event {}", event.type(), event.subject());
+ post(event);
break;
default:
log.warn("Unknown meter event {}", event.type());
@@ -419,10 +451,11 @@
}
}
+
/**
* Task that passes the meter down to the provider.
*/
- private class MeterInstaller implements Runnable {
+ private class MeterInstaller implements PickyRunnable {
private final DeviceId deviceId;
private final Meter meter;
private final MeterOperation.Type op;
@@ -435,6 +468,14 @@
@Override
public void run() {
+ // Check mastership and eventually execute the op on the device
+ log.debug("Meter {} request {}", op.name().toLowerCase(), meter);
+ NodeId master = mastershipService.getMasterFor(meter.deviceId());
+ if (!Objects.equals(local, master)) {
+ log.trace("Not the master of device {}, skipping installation of the meter {}",
+ meter.deviceId(), meter.id());
+ return;
+ }
MeterProvider p = getProvider(this.deviceId);
if (p == null) {
log.error("Unable to recover {}'s provider", deviceId);
@@ -442,6 +483,11 @@
}
p.performMeterOperation(deviceId, new MeterOperation(meter, op));
}
+
+ @Override
+ public int hint() {
+ return meter.id().hashCode();
+ }
}
private class InternalDeviceListener implements DeviceListener {