Adding purgeOnDisconnect support to the meter subsystem and adding vlanId match
criteria to EAPOL trap flows.
* Adding purgeOnDisconnect property to MeterManager
* DeviceListener implementation on MeterManager
* Adding purgeMeter(DeviceId deviceId) method to MeterStore
* Calling the above method when DEVICE_AVAILABILITY_CHANGE is received
* Adding vlanId match criteria to EAPOL trap flows (OltPipeline change)
Change-Id: Ibb254302efe94edf1fd596f74a6eef6587410475
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
index 9b5eae8..3f0efa0 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
@@ -102,13 +102,14 @@
* Notifies the delegate that the meter failed to allow it
* to nofity the app.
*
- * @param op a failed meter operation
+ * @param op a failed meter operation
* @param reason a failure reason
*/
void failedMeter(MeterOperation op, MeterFailReason reason);
/**
* Delete this meter immediately.
+ *
* @param m a meter
*/
void deleteMeterNow(Meter m);
@@ -134,8 +135,15 @@
* Frees the given meter id.
*
* @param deviceId the device id
- * @param meterId the id to be freed
+ * @param meterId the id to be freed
*/
void freeMeterId(DeviceId deviceId, MeterId meterId);
+ /**
+ * Removes all meters of given device from store.
+ *
+ * @param deviceId the device id
+ */
+ void purgeMeter(DeviceId deviceId);
+
}
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index d0dcdea..1f39338 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -23,10 +23,13 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
import org.onlab.util.TriConsumer;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.meter.DefaultMeter;
@@ -87,6 +90,7 @@
private final Logger log = getLogger(getClass());
private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MeterStore store;
@@ -108,6 +112,10 @@
label = "Frequency (in seconds) for polling meters via fallback provider")
private int fallbackMeterPollFrequency = DEFAULT_POLL_FREQUENCY;
+ @Property(name = "purgeOnDisconnection", boolValue = false,
+ label = "Purge entries associated with a device when the device goes offline")
+ private boolean purgeOnDisconnection = false;
+
private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
private ExecutorService executorService;
@@ -119,6 +127,7 @@
store.setDelegate(delegate);
cfgService.registerProperties(getClass());
eventDispatcher.addSink(MeterEvent.class, listenerRegistry);
+ deviceService.addListener(deviceListener);
onComplete = (request, result, error) -> {
request.context().ifPresent(c -> {
@@ -155,6 +164,7 @@
defaultProvider.terminate();
store.unsetDelegate(delegate);
eventDispatcher.removeSink(MeterEvent.class);
+ deviceService.removeListener(deviceListener);
cfgService.unregisterProperties(getClass(), false);
executorService.shutdown();
log.info("Stopped");
@@ -167,6 +177,17 @@
*/
private void readComponentConfiguration(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
+ Boolean flag;
+
+ flag = Tools.isPropertyEnabled(properties, "purgeOnDisconnection");
+ if (flag == null) {
+ log.info("PurgeOnDisconnection is not configured," +
+ "using current value of {}", purgeOnDisconnection);
+ } else {
+ purgeOnDisconnection = flag;
+ log.info("Configured. PurgeOnDisconnection is {}",
+ purgeOnDisconnection ? "enabled" : "disabled");
+ }
String s = get(properties, "fallbackMeterPollFrequency");
try {
@@ -396,4 +417,24 @@
}
}
+ private class InternalDeviceListener implements DeviceListener {
+
+ @Override
+ public void event(DeviceEvent event) {
+ switch (event.type()) {
+ case DEVICE_REMOVED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ DeviceId deviceId = event.subject().id();
+ if (!deviceService.isAvailable(deviceId)) {
+ if (purgeOnDisconnection) {
+ store.purgeMeter(deviceId);
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
}
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index 340b4b8..d5b5ac1 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -789,7 +789,9 @@
Instruction meter = filter.meta().metered();
Instruction writeMetadata = filter.meta().writeMetadata();
- TrafficSelector selector = buildSelector(filter.key(), ethType);
+ Criterion vlanId = filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
+
+ TrafficSelector selector = buildSelector(filter.key(), ethType, vlanId);
TrafficTreatment treatment = buildTreatment(output, meter, writeMetadata);
buildAndApplyRule(filter, selector, treatment);
@@ -898,19 +900,15 @@
private TrafficSelector buildSelector(Criterion... criteria) {
-
TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
- for (Criterion c : criteria) {
- sBuilder.add(c);
- }
+ Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
return sBuilder.build();
}
private TrafficTreatment buildTreatment(Instruction... instructions) {
-
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
index fa0dea8..4c3b220 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -67,7 +67,9 @@
import org.slf4j.Logger;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -336,6 +338,19 @@
}
@Override
+ public void purgeMeter(DeviceId deviceId) {
+
+ List<Versioned<MeterData>> metersPendingRemove = meters.stream()
+ .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+
+ metersPendingRemove.forEach(versionedMeterKey
+ -> deleteMeterNow(versionedMeterKey.value().meter()));
+
+ }
+
+ @Override
public long getMaxMeters(MeterFeaturesKey key) {
MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
return features == null ? 0L : features.maxMeter();