Adding purgeOnDisconnect support to the meter subsystem and adding vlanId match
criteria to EAPOL trap flows.
* Adding purgeOnDisconnect property to MeterManager
* DeviceListener implementation on MeterManager
* Adding purgeMeter(DeviceId deviceId) method to MeterStore
* Calling the above method when DEVICE_AVAILABILITY_CHANGE is received
* Adding vlanId match criteria to EAPOL trap flows (OltPipeline change)
Change-Id: Ibb254302efe94edf1fd596f74a6eef6587410475
(cherry picked from commit 91b38543d822a0d9d092f9b3ff7760b1a206226a)
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
index 9b5eae8..3f0efa0 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
@@ -102,13 +102,14 @@
* Notifies the delegate that the meter failed to allow it
* to nofity the app.
*
- * @param op a failed meter operation
+ * @param op a failed meter operation
* @param reason a failure reason
*/
void failedMeter(MeterOperation op, MeterFailReason reason);
/**
* Delete this meter immediately.
+ *
* @param m a meter
*/
void deleteMeterNow(Meter m);
@@ -134,8 +135,15 @@
* Frees the given meter id.
*
* @param deviceId the device id
- * @param meterId the id to be freed
+ * @param meterId the id to be freed
*/
void freeMeterId(DeviceId deviceId, MeterId meterId);
+ /**
+ * Removes all meters of given device from store.
+ *
+ * @param deviceId the device id
+ */
+ void purgeMeter(DeviceId deviceId);
+
}
diff --git a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
index 6afb15c..bcf9d2f 100644
--- a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
+++ b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
@@ -119,6 +119,9 @@
public static final String MM_FALLBACK_METER_POLL_FREQUENCY = "fallbackMeterPollFrequency";
public static final int MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT = 30;
+ public static final String MM_PURGE_ON_DISCONNECTION = "purgeOnDisconnection";
+ public static final boolean MM_PURGE_ON_DISCONNECTION_DEFAULT = false;
+
public static final String NRM_ARP_ENABLED = "arpEnabled";
public static final boolean NRM_ARP_ENABLED_DEFAULT = true;
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 9bc1a6c..cdc3244 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -15,10 +15,13 @@
*/
package org.onosproject.net.meter.impl;
+import org.onlab.util.Tools;
import org.onlab.util.TriConsumer;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.meter.DefaultMeter;
@@ -66,21 +69,24 @@
import static org.onosproject.net.OsgiPropertyConstants.MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT;
import static org.onosproject.net.OsgiPropertyConstants.MM_NUM_THREADS;
import static org.onosproject.net.OsgiPropertyConstants.MM_NUM_THREADS_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.MM_PURGE_ON_DISCONNECTION;
+import static org.onosproject.net.OsgiPropertyConstants.MM_PURGE_ON_DISCONNECTION_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provides implementation of the meter service APIs.
*/
@Component(
- immediate = true,
- service = {
- MeterService.class,
- MeterProviderRegistry.class
- },
- property = {
- MM_NUM_THREADS + ":Integer=" + MM_NUM_THREADS_DEFAULT,
- MM_FALLBACK_METER_POLL_FREQUENCY + ":Integer=" + MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT
- }
+ immediate = true,
+ service = {
+ MeterService.class,
+ MeterProviderRegistry.class
+ },
+ property = {
+ MM_NUM_THREADS + ":Integer=" + MM_NUM_THREADS_DEFAULT,
+ MM_FALLBACK_METER_POLL_FREQUENCY + ":Integer=" + MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT,
+ MM_PURGE_ON_DISCONNECTION + ":Boolean=" + MM_PURGE_ON_DISCONNECTION_DEFAULT,
+ }
)
public class MeterManager
extends AbstractListenerProviderRegistry<MeterEvent, MeterListener, MeterProvider, MeterProviderService>
@@ -91,6 +97,7 @@
private final Logger log = getLogger(getClass());
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY)
private MeterStore store;
@@ -113,6 +120,9 @@
/** Frequency (in seconds) for polling meters via fallback provider. */
private int fallbackMeterPollFrequency = MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT;
+ /** Purge entries associated with a device when the device goes offline. */
+ private boolean purgeOnDisconnection = MM_PURGE_ON_DISCONNECTION_DEFAULT;
+
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
private ExecutorService executorService;
@@ -124,6 +134,7 @@
store.setDelegate(delegate);
cfgService.registerProperties(getClass());
eventDispatcher.addSink(MeterEvent.class, listenerRegistry);
+ deviceService.addListener(deviceListener);
onComplete = (request, result, error) -> {
request.context().ifPresent(c -> {
@@ -160,6 +171,7 @@
defaultProvider.terminate();
store.unsetDelegate(delegate);
eventDispatcher.removeSink(MeterEvent.class);
+ deviceService.removeListener(deviceListener);
cfgService.unregisterProperties(getClass(), false);
executorService.shutdown();
log.info("Stopped");
@@ -172,6 +184,17 @@
*/
private void readComponentConfiguration(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
+ Boolean flag;
+
+ flag = Tools.isPropertyEnabled(properties, MM_PURGE_ON_DISCONNECTION);
+ if (flag == null) {
+ log.info("PurgeOnDisconnection is not configured," +
+ "using current value of {}", purgeOnDisconnection);
+ } else {
+ purgeOnDisconnection = flag;
+ log.info("Configured. PurgeOnDisconnection is {}",
+ purgeOnDisconnection ? "enabled" : "disabled");
+ }
String s = get(properties, MM_FALLBACK_METER_POLL_FREQUENCY);
try {
@@ -410,4 +433,24 @@
}
}
+ private class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ switch (event.type()) {
+ case DEVICE_REMOVED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ DeviceId deviceId = event.subject().id();
+ if (!deviceService.isAvailable(deviceId)) {
+ if (purgeOnDisconnection) {
+ store.purgeMeter(deviceId);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 99bbac1..0646e66 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -66,7 +66,9 @@
import org.slf4j.Logger;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -335,6 +337,19 @@
}
@Override
+ public void purgeMeter(DeviceId deviceId) {
+
+ List<Versioned<MeterData>> metersPendingRemove = meters.stream()
+ .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+
+ metersPendingRemove.forEach(versionedMeterKey
+ -> deleteMeterNow(versionedMeterKey.value().meter()));
+
+ }
+
+ @Override
public long getMaxMeters(MeterFeaturesKey key) {
MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
return features == null ? 0L : features.maxMeter();
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index 0ccbc4d..a54a140 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -795,7 +795,9 @@
Instruction meter = filter.meta().metered();
Instruction writeMetadata = filter.meta().writeMetadata();
- TrafficSelector selector = buildSelector(filter.key(), ethType);
+ Criterion vlanId = filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
+
+ TrafficSelector selector = buildSelector(filter.key(), ethType, vlanId);
TrafficTreatment treatment = buildTreatment(output, meter, writeMetadata);
buildAndApplyRule(filter, selector, treatment);
@@ -904,19 +906,15 @@
private TrafficSelector buildSelector(Criterion... criteria) {
-
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
- for (Criterion c : criteria) {
- sBuilder.add(c);
- }
+ Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
return sBuilder.build();
}
private TrafficTreatment buildTreatment(Instruction... instructions) {
-
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);