Adding purgeOnDisconnect support to the meter subsystem and adding vlanId match
criteria to EAPOL trap flows.

* Adding purgeOnDisconnect property to MeterManager
* DeviceListener implementation on MeterManager
* Adding purgeMeter(DeviceId deviceId) method to MeterStore
* Calling the above method when DEVICE_AVAILABILITY_CHANGE is received
* Adding vlanId match criteria to EAPOL trap flows (OltPipeline change)

Change-Id: Ibb254302efe94edf1fd596f74a6eef6587410475
(cherry picked from commit 91b38543d822a0d9d092f9b3ff7760b1a206226a)
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
index 9b5eae8..3f0efa0 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
@@ -102,13 +102,14 @@
      * Notifies the delegate that the meter failed to allow it
      * to nofity the app.
      *
-     * @param op a failed meter operation
+     * @param op     a failed meter operation
      * @param reason a failure reason
      */
     void failedMeter(MeterOperation op, MeterFailReason reason);
 
     /**
      * Delete this meter immediately.
+     *
      * @param m a meter
      */
     void deleteMeterNow(Meter m);
@@ -134,8 +135,15 @@
      * Frees the given meter id.
      *
      * @param deviceId the device id
-     * @param meterId the id to be freed
+     * @param meterId  the id to be freed
      */
     void freeMeterId(DeviceId deviceId, MeterId meterId);
 
+    /**
+     * Removes all meters of given device from store.
+     *
+     * @param deviceId the device id
+     */
+    void purgeMeter(DeviceId deviceId);
+
 }
diff --git a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
index 6afb15c..bcf9d2f 100644
--- a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
+++ b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
@@ -119,6 +119,9 @@
     public static final String MM_FALLBACK_METER_POLL_FREQUENCY = "fallbackMeterPollFrequency";
     public static final int MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT = 30;
 
+    public static final String MM_PURGE_ON_DISCONNECTION = "purgeOnDisconnection";
+    public static final boolean MM_PURGE_ON_DISCONNECTION_DEFAULT = false;
+
     public static final String NRM_ARP_ENABLED = "arpEnabled";
     public static final boolean NRM_ARP_ENABLED_DEFAULT = true;
 
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 9bc1a6c..cdc3244 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -15,10 +15,13 @@
  */
 package org.onosproject.net.meter.impl;
 
+import org.onlab.util.Tools;
 import org.onlab.util.TriConsumer;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.driver.DriverService;
 import org.onosproject.net.meter.DefaultMeter;
@@ -66,21 +69,24 @@
 import static org.onosproject.net.OsgiPropertyConstants.MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT;
 import static org.onosproject.net.OsgiPropertyConstants.MM_NUM_THREADS;
 import static org.onosproject.net.OsgiPropertyConstants.MM_NUM_THREADS_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.MM_PURGE_ON_DISCONNECTION;
+import static org.onosproject.net.OsgiPropertyConstants.MM_PURGE_ON_DISCONNECTION_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Provides implementation of the meter service APIs.
  */
 @Component(
-    immediate = true,
-    service = {
-        MeterService.class,
-        MeterProviderRegistry.class
-    },
-    property = {
-        MM_NUM_THREADS + ":Integer=" + MM_NUM_THREADS_DEFAULT,
-        MM_FALLBACK_METER_POLL_FREQUENCY + ":Integer=" + MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT
-    }
+        immediate = true,
+        service = {
+                MeterService.class,
+                MeterProviderRegistry.class
+        },
+        property = {
+                MM_NUM_THREADS + ":Integer=" + MM_NUM_THREADS_DEFAULT,
+                MM_FALLBACK_METER_POLL_FREQUENCY + ":Integer=" + MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT,
+                MM_PURGE_ON_DISCONNECTION + ":Boolean=" + MM_PURGE_ON_DISCONNECTION_DEFAULT,
+        }
 )
 public class MeterManager
         extends AbstractListenerProviderRegistry<MeterEvent, MeterListener, MeterProvider, MeterProviderService>
@@ -91,6 +97,7 @@
 
     private final Logger log = getLogger(getClass());
     private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
+    private final DeviceListener deviceListener = new InternalDeviceListener();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private MeterStore store;
@@ -113,6 +120,9 @@
     /** Frequency (in seconds) for polling meters via fallback provider. */
     private int fallbackMeterPollFrequency = MM_FALLBACK_METER_POLL_FREQUENCY_DEFAULT;
 
+    /** Purge entries associated with a device when the device goes offline. */
+    private boolean purgeOnDisconnection = MM_PURGE_ON_DISCONNECTION_DEFAULT;
+
     private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
 
     private ExecutorService executorService;
@@ -124,6 +134,7 @@
         store.setDelegate(delegate);
         cfgService.registerProperties(getClass());
         eventDispatcher.addSink(MeterEvent.class, listenerRegistry);
+        deviceService.addListener(deviceListener);
 
         onComplete = (request, result, error) -> {
                 request.context().ifPresent(c -> {
@@ -160,6 +171,7 @@
         defaultProvider.terminate();
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(MeterEvent.class);
+        deviceService.removeListener(deviceListener);
         cfgService.unregisterProperties(getClass(), false);
         executorService.shutdown();
         log.info("Stopped");
@@ -172,6 +184,17 @@
      */
     private void readComponentConfiguration(ComponentContext context) {
         Dictionary<?, ?> properties = context.getProperties();
+        Boolean flag;
+
+        flag = Tools.isPropertyEnabled(properties, MM_PURGE_ON_DISCONNECTION);
+        if (flag == null) {
+            log.info("PurgeOnDisconnection is not configured," +
+                    "using current value of {}", purgeOnDisconnection);
+        } else {
+            purgeOnDisconnection = flag;
+            log.info("Configured. PurgeOnDisconnection is {}",
+                    purgeOnDisconnection ? "enabled" : "disabled");
+        }
 
         String s = get(properties, MM_FALLBACK_METER_POLL_FREQUENCY);
         try {
@@ -410,4 +433,24 @@
         }
     }
 
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            switch (event.type()) {
+                case DEVICE_REMOVED:
+                case DEVICE_AVAILABILITY_CHANGED:
+                    DeviceId deviceId = event.subject().id();
+                    if (!deviceService.isAvailable(deviceId)) {
+                        if (purgeOnDisconnection) {
+                            store.purgeMeter(deviceId);
+                        }
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 99bbac1..0646e66 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -66,7 +66,9 @@
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
@@ -335,6 +337,19 @@
     }
 
     @Override
+    public void purgeMeter(DeviceId deviceId) {
+
+        List<Versioned<MeterData>> metersPendingRemove = meters.stream()
+                .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+
+        metersPendingRemove.forEach(versionedMeterKey
+                -> deleteMeterNow(versionedMeterKey.value().meter()));
+
+    }
+
+    @Override
     public long getMaxMeters(MeterFeaturesKey key) {
         MeterFeatures features = Versioned.valueOrElse(meterFeatures.get(key), null);
         return features == null ? 0L : features.maxMeter();
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index 0ccbc4d..a54a140 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -795,7 +795,9 @@
         Instruction meter = filter.meta().metered();
         Instruction writeMetadata = filter.meta().writeMetadata();
 
-        TrafficSelector selector = buildSelector(filter.key(), ethType);
+        Criterion vlanId = filterForCriterion(filter.conditions(), Criterion.Type.VLAN_VID);
+
+        TrafficSelector selector = buildSelector(filter.key(), ethType, vlanId);
         TrafficTreatment treatment = buildTreatment(output, meter, writeMetadata);
         buildAndApplyRule(filter, selector, treatment);
 
@@ -904,19 +906,15 @@
 
     private TrafficSelector buildSelector(Criterion... criteria) {
 
-
         TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
 
-        for (Criterion c : criteria) {
-            sBuilder.add(c);
-        }
+        Arrays.stream(criteria).filter(Objects::nonNull).forEach(sBuilder::add);
 
         return sBuilder.build();
     }
 
     private TrafficTreatment buildTreatment(Instruction... instructions) {
 
-
         TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
 
         Arrays.stream(instructions).filter(Objects::nonNull).forEach(tBuilder::add);