[ONOS-3536] Implement back-end metrics saving logic using RRD

Change-Id: I1b3c495380884571dc88d2f9fb3152fdf41ef655
diff --git a/apps/cpman/api/src/main/java/org/onosproject/cpman/MetricsDatabase.java b/apps/cpman/api/src/main/java/org/onosproject/cpman/MetricsDatabase.java
new file mode 100644
index 0000000..a0e3679
--- /dev/null
+++ b/apps/cpman/api/src/main/java/org/onosproject/cpman/MetricsDatabase.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cpman;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Database for storing a metric.
+ */
+public interface MetricsDatabase {
+    /**
+     * Returns the metric name of this database.
+     *
+     * @return metric name
+     */
+    String metricName();
+
+    /**
+     * Update metric value by specifying metric type.
+     *
+     * @param metricType metric type (e.g., load, usage, etc.)
+     * @param value metric value
+     */
+    void updateMetric(String metricType, double value);
+
+    /**
+     * Update metric value by specifying metric type in a certain time.
+     *
+     * @param metricType metric type (e.g., load, usage, etc.)
+     * @param value metric value
+     * @param time update time in seconds
+     */
+    void updateMetric(String metricType, double value, long time);
+
+    /**
+     * Update metric values of a collection of metric types.
+     *
+     * @param metrics a collection of metrics which consists of a pair of
+     *                metric type and metric value
+     * @param time    update time in seconds
+     */
+    void updateMetrics(Map<String, Double> metrics, long time);
+
+    /**
+     * Update metric values of a collection of metric types.
+     *
+     * @param metrics a collection of metrics which consists of a pair of
+     *                metric type and metric value
+     */
+    void updateMetrics(Map<String, Double> metrics);
+
+    /**
+     * Returns most recent metric value of a given metric type.
+     *
+     * @param metricType metric type
+     * @return metric value
+     */
+    double recentMetric(String metricType);
+
+    /**
+     * Return most recent metric values of a given metric type for a given period.
+     *
+     * @param metricType metric type
+     * @param duration duration
+     * @param unit time unit
+     * @return a collection of metric value
+     */
+    double[] recentMetrics(String metricType, int duration, TimeUnit unit);
+
+    /**
+     * Returns minimum metric value of a given metric type.
+     *
+     * @param metricType metric type
+     * @return metric value
+     */
+    double minMetric(String metricType);
+
+    /**
+     * Returns maximum metric value of a given metric type.
+     *
+     * @param metricType metric type
+     * @return metric value
+     */
+    double maxMetric(String metricType);
+
+    /**
+     * Returns a collection of metric values of a given metric type for a day.
+     *
+     * @param metricType metric type
+     * @return a collection of metric value
+     */
+    double[] metrics(String metricType);
+
+    /**
+     * Returns a collection of metric values of a given metric type for
+     * a given period.
+     *
+     * @param metricType metric type
+     * @param startTime start time
+     * @param endTime end time
+     * @return a collection of metric value
+     */
+    double[] metrics(String metricType, long startTime, long endTime);
+
+    /**
+     * A builder of MetricsDatabase.
+     */
+    interface Builder {
+
+        /**
+         * Sets the metric name.
+         *
+         * @param metricName metric name
+         * @return builder object
+         */
+        Builder withMetricName(String metricName);
+
+        /**
+         * Add a new metric to be monitored.
+         *
+         * @param metricType control metric type
+         */
+        Builder addMetricType(String metricType);
+
+        /**
+         * Builds a MetricDatabase instance.
+         *
+         * @return MetricDatabase instance
+         */
+        MetricsDatabase build();
+    }
+}
diff --git a/apps/cpman/app/pom.xml b/apps/cpman/app/pom.xml
index b7c76ae..78d88bf 100644
--- a/apps/cpman/app/pom.xml
+++ b/apps/cpman/app/pom.xml
@@ -105,6 +105,11 @@
             <version>1.1.1</version>
         </dependency>
         <dependency>
+            <groupId>org.rrd4j</groupId>
+            <artifactId>rrd4j</artifactId>
+            <version>2.2</version>
+        </dependency>
+        <dependency>
             <groupId>com.sun.jersey</groupId>
             <artifactId>jersey-servlet</artifactId>
         </dependency>
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlPlaneMonitor.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlPlaneMonitor.java
index e1cbdad..5ffe4a4 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlPlaneMonitor.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlPlaneMonitor.java
@@ -43,7 +43,6 @@
 
     @Activate
     public void activate() {
-
     }
 
     @Deactivate
@@ -53,6 +52,7 @@
     @Override
     public void updateMetric(ControlMetric cpm, Integer updateInterval,
                              Optional<DeviceId> deviceId) {
+
     }
 
     @Override
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/DefaultMetricsDatabase.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/DefaultMetricsDatabase.java
new file mode 100644
index 0000000..7c71410
--- /dev/null
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/DefaultMetricsDatabase.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cpman.impl;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.onosproject.cpman.MetricsDatabase;
+import org.rrd4j.ConsolFun;
+import org.rrd4j.DsType;
+import org.rrd4j.core.ArcDef;
+import org.rrd4j.core.DsDef;
+import org.rrd4j.core.FetchRequest;
+import org.rrd4j.core.RrdBackendFactory;
+import org.rrd4j.core.RrdDb;
+import org.rrd4j.core.RrdDef;
+import org.rrd4j.core.Sample;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * An implementation of control plane metrics back-end database.
+ */
+public final class DefaultMetricsDatabase implements MetricsDatabase {
+    private String metricName;
+    private RrdDb rrdDb;
+    private Sample sample;
+    private static final long SECONDS_OF_DAY = 60L * 60L * 24L;
+    private static final long SECONDS_OF_MINUTE = 60L;
+    private static final ConsolFun CONSOL_FUNCTION = ConsolFun.LAST;
+    private static final String NON_EXIST_METRIC = "Non-existing metric type.";
+    private static final String INSUFFICIENT_DURATION = "Given duration less than one minute.";
+    private static final String EXCEEDED_DURATION = "Given duration exceeds a day time.";
+
+    private DefaultMetricsDatabase(String metricName, RrdDb rrdDb) {
+        this.metricName = metricName;
+        this.rrdDb = rrdDb;
+    }
+
+    @Override
+    public String metricName() {
+        return this.metricName;
+    }
+
+    @Override
+    public void updateMetric(String metricType, double value) {
+        updateMetric(metricType, value, System.currentTimeMillis() / 1000L);
+    }
+
+    @Override
+    public void updateMetric(String metricType, double value, long time) {
+        try {
+            checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
+            sample = rrdDb.createSample(time);
+            sample.setValue(metricType, value);
+            sample.update();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void updateMetrics(Map<String, Double> metrics) {
+        updateMetrics(metrics, System.currentTimeMillis() / 1000L);
+    }
+
+    @Override
+    public void updateMetrics(Map<String, Double> metrics, long time) {
+        try {
+            sample = rrdDb.createSample(time);
+            metrics.forEach((k, v) -> {
+                try {
+                    checkArgument(rrdDb.containsDs(k), NON_EXIST_METRIC);
+                    sample.setValue(k, v);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            });
+            sample.update();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public double recentMetric(String metricType) {
+        try {
+            checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
+            return rrdDb.getDatasource(metricType).getLastValue();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return 0D;
+    }
+
+    @Override
+    public double[] recentMetrics(String metricType, int duration, TimeUnit unit) {
+        try {
+            checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
+            long endTime = rrdDb.getLastUpdateTime();
+            long startTime = endTime - TimeUnit.SECONDS.convert(duration, unit);
+            if (checkTimeRange(startTime, endTime)) {
+                FetchRequest fr = rrdDb.createFetchRequest(CONSOL_FUNCTION, startTime, endTime);
+                return arrangeDataPoints(fr.fetchData().getValues(metricType));
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return new double[0];
+    }
+
+    @Override
+    public double minMetric(String metricType) {
+        try {
+            checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
+            long endTime = rrdDb.getLastUpdateTime() - 1;
+            long startTime = endTime - SECONDS_OF_DAY + 1;
+            return minMetric(metricType, startTime, endTime);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return 0D;
+    }
+
+    @Override
+    public double maxMetric(String metricType) {
+        try {
+            checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
+            long endTime = rrdDb.getLastUpdateTime();
+            long startTime = endTime - SECONDS_OF_DAY;
+            return maxMetric(metricType, startTime, endTime);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return 0D;
+    }
+
+    @Override
+    public double[] metrics(String metricType) {
+        try {
+            checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
+            long endTime = rrdDb.getLastUpdateTime();
+            long startTime = endTime - SECONDS_OF_DAY;
+            return metrics(metricType, startTime, endTime);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return new double[0];
+    }
+
+    @Override
+    public double[] metrics(String metricType, long startTime, long endTime) {
+        try {
+            checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
+            if (checkTimeRange(startTime, endTime)) {
+                FetchRequest fr = rrdDb.createFetchRequest(CONSOL_FUNCTION, startTime, endTime);
+                return arrangeDataPoints(fr.fetchData().getValues(metricType));
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return new double[0];
+    }
+
+    // try to check whether projected time range is within a day
+    private boolean checkTimeRange(long startTime, long endTime) {
+        // check whether the given startTime and endTime larger than 1 minute
+        checkArgument(endTime - startTime >= SECONDS_OF_MINUTE, INSUFFICIENT_DURATION);
+
+        // check whether the given start time and endTime smaller than 1 day
+        checkArgument(endTime - startTime <= SECONDS_OF_DAY, EXCEEDED_DURATION);
+        return true;
+    }
+
+    // try to remove first and last data points
+    private double[] arrangeDataPoints(double[] data) {
+        return Arrays.copyOfRange(data, 1, data.length - 1);
+    }
+
+    // obtains maximum metric value among projected range
+    private double maxMetric(String metricType, long startTime, long endTime) {
+        double[] all = metrics(metricType, startTime, endTime);
+        List list = Arrays.asList(ArrayUtils.toObject(all));
+        return (double) Collections.max(list);
+    }
+
+    // obtains minimum metric value among projected range
+    private double minMetric(String metricType, long startTime, long endTime) {
+        double[] all = metrics(metricType, startTime, endTime);
+        List list = Arrays.asList(ArrayUtils.toObject(all));
+        return (double) Collections.min(list);
+    }
+
+    public static final class Builder implements MetricsDatabase.Builder {
+        private static final int RESOLUTION = 60; // seconds
+        private static final String STORING_METHOD = "MEMORY";
+        private static final DsType SOURCE_TYPE = DsType.GAUGE;
+        private static final String DB_PATH = "CPMAN";
+        private static final ConsolFun CONSOL_FUNCTION = ConsolFun.LAST;
+        private static final double MIN_VALUE = 0;
+        private static final double MAX_VALUE = Double.NaN;
+        private static final double XFF_VALUE = 0.2;
+        private static final int STEP_VALUE = 1;
+        private static final int ROW_VALUE = 60 * 24;
+        private static final String METRIC_NAME_MSG = "Must specify a metric name.";
+        private static final String METRIC_TYPE_MSG = "Must supply at least a metric type.";
+
+        private RrdDb rrdDb;
+        private RrdDef rrdDef;
+        private List<DsDef> dsDefs;
+        private String metricName;
+
+        public Builder() {
+
+            // define the resolution of monitored metrics
+            rrdDef = new RrdDef(DB_PATH, RESOLUTION);
+
+            // initialize data source definition list
+            dsDefs = new ArrayList<>();
+        }
+
+        @Override
+        public Builder withMetricName(String metricName) {
+            this.metricName = metricName;
+            return this;
+        }
+
+        @Override
+        public Builder addMetricType(String metricType) {
+            dsDefs.add(defineSchema(metricType));
+            return this;
+        }
+
+        @Override
+        public MetricsDatabase build() {
+            checkNotNull(metricName, METRIC_NAME_MSG);
+            checkArgument(dsDefs.size() != 0, METRIC_TYPE_MSG);
+
+            try {
+                DsDef[] dsDefArray = new DsDef[dsDefs.size()];
+                IntStream.range(0, dsDefs.size()).forEach(i -> dsDefArray[i] = dsDefs.get(i));
+
+                rrdDef.addDatasource(dsDefArray);
+                rrdDef.setStep(RESOLUTION);
+
+                // raw archive, no aggregation is required
+                ArcDef rawArchive = new ArcDef(CONSOL_FUNCTION, XFF_VALUE,
+                        STEP_VALUE, ROW_VALUE);
+                rrdDef.addArchive(rawArchive);
+
+                // always store the metric data in memory...
+                rrdDb = new RrdDb(rrdDef, RrdBackendFactory.getFactory(STORING_METHOD));
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            return new DefaultMetricsDatabase(metricName, rrdDb);
+        }
+
+        private DsDef defineSchema(String metricType) {
+            return new DsDef(metricType, SOURCE_TYPE, RESOLUTION,
+                    MIN_VALUE, MAX_VALUE);
+        }
+    }
+}
diff --git a/apps/cpman/app/src/test/java/org/onosproject/cpman/impl/MetricsDatabaseTest.java b/apps/cpman/app/src/test/java/org/onosproject/cpman/impl/MetricsDatabaseTest.java
new file mode 100644
index 0000000..6058227
--- /dev/null
+++ b/apps/cpman/app/src/test/java/org/onosproject/cpman/impl/MetricsDatabaseTest.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cpman.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.cpman.MetricsDatabase;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test for control plane metrics back-end database.
+ */
+public class MetricsDatabaseTest {
+
+    private MetricsDatabase mdb;
+    private static final String CPU_METRIC = "cpu";
+    private static final String CPU_LOAD = "load";
+    private static final String MEMORY_METRIC = "memory";
+    private static final String MEMORY_FREE_PERC = "freePerc";
+    private static final String MEMORY_USED_PERC = "usedPerc";
+
+    /**
+     * Initializes the MetricsDatabase instance.
+     */
+    @Before
+    public void setUp() {
+        mdb = new DefaultMetricsDatabase.Builder()
+                .withMetricName(CPU_METRIC)
+                .addMetricType(CPU_LOAD)
+                .build();
+    }
+
+    /**
+     * Tests the metric update function.
+     */
+    @Test
+    public void testMetricUpdate() {
+        long currentTime = System.currentTimeMillis() / 1000L;
+
+        mdb.updateMetric(CPU_LOAD, 30, currentTime);
+        assertThat(30D, is(mdb.recentMetric(CPU_LOAD)));
+
+        mdb.updateMetric(CPU_LOAD, 40, currentTime + 60);
+        assertThat(40D, is(mdb.recentMetric(CPU_LOAD)));
+
+        mdb.updateMetric(CPU_LOAD, 50, currentTime + 120);
+        assertThat(50D, is(mdb.recentMetric(CPU_LOAD)));
+    }
+
+    /**
+     * Tests the metric range fetch function.
+     */
+    @Test
+    public void testMetricRangeFetch() {
+        // full range fetch
+        assertThat(mdb.metrics(CPU_LOAD).length, is(60 * 24));
+
+        // query one minute time range
+        assertThat(mdb.recentMetrics(CPU_LOAD, 1, TimeUnit.MINUTES).length, is(1));
+
+        // query one hour time range
+        assertThat(mdb.recentMetrics(CPU_LOAD, 1, TimeUnit.HOURS).length, is(60));
+
+        // query one day time range
+        assertThat(mdb.recentMetrics(CPU_LOAD, 1, TimeUnit.DAYS).length, is(60 * 24));
+
+        // query a specific time range
+        long endTime = System.currentTimeMillis() / 1000L;
+        long startTime = endTime - 60 * 5;
+        assertThat(mdb.metrics(CPU_LOAD, startTime, endTime).length, is(5));
+    }
+
+    /**
+     * Test the projected time range.
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void testExceededTimeRange() {
+        // query 25 hours time range
+        assertThat(mdb.recentMetrics(CPU_LOAD, 25, TimeUnit.HOURS).length, is(60 * 24));
+    }
+
+    /**
+     * Test the projected time range.
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void testInsufficientTimeRange() {
+        // query 50 seconds time range
+        assertThat(mdb.recentMetrics(CPU_LOAD, 50, TimeUnit.SECONDS).length, is(1));
+    }
+
+    /**
+     * Test multiple metrics update and query.
+     */
+    @Test
+    public void testMultipleMetrics() {
+        MetricsDatabase multiMdb = new DefaultMetricsDatabase.Builder()
+                        .withMetricName(MEMORY_METRIC)
+                        .addMetricType(MEMORY_FREE_PERC)
+                        .addMetricType(MEMORY_USED_PERC)
+                        .build();
+
+        Map<String, Double> metrics = new HashMap<>();
+        metrics.putIfAbsent(MEMORY_FREE_PERC, 30D);
+        metrics.putIfAbsent(MEMORY_USED_PERC, 70D);
+        multiMdb.updateMetrics(metrics);
+
+        assertThat(30D, is(multiMdb.recentMetric(MEMORY_FREE_PERC)));
+        assertThat(70D, is(multiMdb.recentMetric(MEMORY_USED_PERC)));
+    }
+}