[ONOS-7116] Implement MeterProgrammable and MeterDriverProvider

Change-Id: I398edda11a6b77b66d79758cf3afab42976e8ff3
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterDriverProvider.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterDriverProvider.java
new file mode 100644
index 0000000..e6151b0
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterDriverProvider.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.meter.impl;
+
+import com.google.common.collect.Sets;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterOperation;
+import org.onosproject.net.meter.MeterOperations;
+import org.onosproject.net.meter.MeterProgrammable;
+import org.onosproject.net.meter.MeterProvider;
+
+import org.onosproject.net.meter.MeterProviderService;
+import org.onosproject.net.provider.AbstractProvider;
+import org.onosproject.net.provider.ProviderId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_ADDED;
+import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
+
+/**
+ * Driver-based Meter provider.
+ */
+public class MeterDriverProvider extends AbstractProvider implements MeterProvider {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    // To be extracted for reuse as we deal with other.
+    private static final String SCHEME = "default";
+    private static final String PROVIDER_NAME = "org.onosproject.provider.meter";
+
+    // potentially positive device event
+    private static final Set<DeviceEvent.Type> POSITIVE_DEVICE_EVENT =
+            Sets.immutableEnumSet(DEVICE_ADDED,
+                    DEVICE_AVAILABILITY_CHANGED);
+
+    protected DeviceService deviceService;
+    protected MastershipService mastershipService;
+    MeterProviderService meterProviderService;
+    int pollFrequency;
+
+    private InternalDeviceListener deviceListener = new InternalDeviceListener();
+    private ScheduledExecutorService executor
+            = newSingleThreadScheduledExecutor(groupedThreads("MeterDriverProvider", "%d", log));
+    private ScheduledFuture<?> poller = null;
+
+    public MeterDriverProvider() {
+        super(new ProviderId(SCHEME, PROVIDER_NAME));
+    }
+
+    /**
+     * Initializes the provider with the necessary device service, meter provider service,
+     * mastership service and poll frequency.
+     *
+     * @param deviceService        device service
+     * @param meterProviderService meter provider service
+     * @param mastershipService    mastership service
+     * @param pollFrequency        meter entry poll frequency
+     */
+    void init(DeviceService deviceService, MeterProviderService meterProviderService,
+              MastershipService mastershipService, int pollFrequency) {
+        this.deviceService = deviceService;
+        this.meterProviderService = meterProviderService;
+        this.mastershipService = mastershipService;
+        this.pollFrequency = pollFrequency;
+
+        deviceService.addListener(deviceListener);
+
+        if (poller != null && !poller.isCancelled()) {
+            poller.cancel(false);
+        }
+
+        poller = executor.scheduleAtFixedRate(this::pollMeters, pollFrequency,
+                pollFrequency, TimeUnit.SECONDS);
+
+    }
+
+    void terminate() {
+        deviceService.removeListener(deviceListener);
+        deviceService = null;
+        meterProviderService = null;
+        mastershipService = null;
+        poller.cancel(true);
+        executor.shutdown();
+    }
+
+    private void pollMeters() {
+        deviceService.getAvailableDevices().forEach(device -> {
+            if (mastershipService.isLocalMaster(device.id()) &&
+                    device.is(MeterProgrammable.class)) {
+                pollDeviceMeters(device.id());
+            }
+        });
+    }
+
+    @Override
+    public void performMeterOperation(DeviceId deviceId, MeterOperations meterOps) {
+        meterOps.operations().forEach(meterOperation -> performMeterOperation(deviceId, meterOperation));
+    }
+
+    @Override
+    public void performMeterOperation(DeviceId deviceId, MeterOperation meterOp) {
+        MeterProgrammable programmable = getMeterProgrammable(deviceId);
+        if (programmable != null) {
+            programmable.performMeterOperation(meterOp);
+        }
+    }
+
+    private void pollDeviceMeters(DeviceId deviceId) {
+        Collection<Meter> meters = null;
+        try {
+            meters = getMeterProgrammable(deviceId).getMeters().get(pollFrequency, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.warn("Unable to get the Meters from {}, error: {}", deviceId, e.getMessage());
+            log.debug("Exception: ", e);
+        }
+        meterProviderService.pushMeterMetrics(deviceId, meters);
+    }
+
+    private MeterProgrammable getMeterProgrammable(DeviceId deviceId) {
+        Device device = deviceService.getDevice(deviceId);
+        if (device.is(MeterProgrammable.class)) {
+            return device.as(MeterProgrammable.class);
+        } else {
+            log.debug("Device {} is not meter programmable", deviceId);
+            return null;
+        }
+    }
+
+    private class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            executor.execute(() -> handleEvent(event));
+        }
+
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            Device device = event.subject();
+            return POSITIVE_DEVICE_EVENT.contains(event.type()) &&
+                    device.is(MeterProgrammable.class);
+        }
+
+        private void handleEvent(DeviceEvent event) {
+            Device device = event.subject();
+            boolean isRelevant = mastershipService.isLocalMaster(device.id()) &&
+                    deviceService.isAvailable(device.id());
+
+            if (isRelevant) {
+                pollDeviceMeters(device.id());
+            }
+        }
+    }
+}