Aggregate system metrics using metrics service

Change-Id: I617fa21973b7e01b92f311a6fa5687e1f0f870c2
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/SystemMetricsAggregator.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/SystemMetricsAggregator.java
new file mode 100644
index 0000000..900ddea
--- /dev/null
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/SystemMetricsAggregator.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cpman.impl;
+
+import com.codahale.metrics.Meter;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onlab.metrics.MetricsComponent;
+import org.onlab.metrics.MetricsFeature;
+import org.onlab.metrics.MetricsService;
+import org.onosproject.cpman.ControlMetricType;
+import org.onosproject.cpman.ControlResource;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Aggregate system metrics.
+ */
+public class SystemMetricsAggregator {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final String DEFAULT_RESOURCE_NAME = "system";
+    private static final String DEFAULT_METER_SUFFIX = "rate";
+    private static final String DISK_RESOURCE_TYPE = "disk";
+    private static final String NETWORK_RESOURCE_TYPE = "network";
+    private final Map<ControlMetricType, Meter> meterMap = Maps.newHashMap();
+    private final Set<ControlMetricType> metricTypeSet = Sets.newHashSet();
+
+    public SystemMetricsAggregator(MetricsService metricsService, Optional<String> resName, String resType) {
+        String resourceName = resName.isPresent() ? resName.get() : DEFAULT_RESOURCE_NAME;
+        MetricsComponent mc = metricsService.registerComponent(resourceName);
+
+        if (resName.isPresent()) {
+            if (DISK_RESOURCE_TYPE.equals(resType)) {
+                metricTypeSet.addAll(ControlResource.DISK_METRICS);
+            } else if (NETWORK_RESOURCE_TYPE.equals(resType)) {
+                metricTypeSet.addAll(ControlResource.NETWORK_METRICS);
+            } else {
+                log.warn("Not valid resource type {}", resType);
+            }
+        } else {
+            metricTypeSet.addAll(ControlResource.MEMORY_METRICS);
+            metricTypeSet.addAll(ControlResource.CPU_METRICS);
+        }
+
+        metricTypeSet.forEach(type -> {
+            MetricsFeature metricsFeature = mc.registerFeature(type.toString());
+            Meter meter = metricsService.createMeter(mc, metricsFeature, DEFAULT_METER_SUFFIX);
+            meterMap.putIfAbsent(type, meter);
+        });
+    }
+
+    /**
+     * Increments metric value.
+     *
+     * @param type metric type
+     * @param value metric value
+     */
+    public void increment(ControlMetricType type, long value) {
+        meterMap.get(type).mark(value);
+    }
+}
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
index 3e8c57b..8b4244f 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
@@ -19,6 +19,7 @@
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.lang3.StringUtils;
+import org.onlab.metrics.MetricsService;
 import org.onosproject.cpman.ControlMetric;
 import org.onosproject.cpman.ControlMetricType;
 import org.onosproject.cpman.ControlPlaneMonitorService;
@@ -26,6 +27,7 @@
 import org.onosproject.cpman.MetricValue;
 import org.onosproject.cpman.SystemInfo;
 import org.onosproject.cpman.impl.DefaultSystemInfo;
+import org.onosproject.cpman.impl.SystemMetricsAggregator;
 import org.onosproject.cpman.impl.SystemInfoFactory;
 import org.onosproject.rest.AbstractWebResource;
 import org.slf4j.Logger;
@@ -53,7 +55,9 @@
 public class SystemMetricsCollectorWebResource extends AbstractWebResource {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final ControlPlaneMonitorService service = get(ControlPlaneMonitorService.class);
+    private final ControlPlaneMonitorService monitorService = get(ControlPlaneMonitorService.class);
+    private final MetricsService metricsService = get(MetricsService.class);
+
     private static final int UPDATE_INTERVAL_IN_MINUTE = 1;
     private static final String INVALID_SYSTEM_SPECS = "Invalid system specifications";
     private static final String INVALID_RESOURCE_NAME = "Invalid resource name";
@@ -68,6 +72,9 @@
             .stream().map(type -> toCamelCase(type.toString(), true))
             .collect(Collectors.toSet());
 
+    private SystemMetricsAggregator systemAggr =
+            new SystemMetricsAggregator(metricsService, Optional.ofNullable(null), "system");
+
     /**
      * Collects CPU metrics.
      *
@@ -98,28 +105,31 @@
 
             cm = new ControlMetric(ControlMetricType.CPU_LOAD,
                     new MetricValue.Builder().load(cpuLoad).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.CPU_LOAD, cpuLoad);
 
             cm = new ControlMetric(ControlMetricType.TOTAL_CPU_TIME,
                     new MetricValue.Builder().load(totalCpuTime).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.TOTAL_CPU_TIME, totalCpuTime);
 
             cm = new ControlMetric(ControlMetricType.SYS_CPU_TIME,
                     new MetricValue.Builder().load(sysCpuTime).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.SYS_CPU_TIME, sysCpuTime);
 
             cm = new ControlMetric(ControlMetricType.USER_CPU_TIME,
                     new MetricValue.Builder().load(userCpuTime).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.USER_CPU_TIME, userCpuTime);
 
             cm = new ControlMetric(ControlMetricType.CPU_IDLE_TIME,
                     new MetricValue.Builder().load(cpuIdleTime).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.CPU_IDLE_TIME, cpuIdleTime);
 
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
-        } catch (IllegalArgumentException iae) {
-            log.error("[CPU] Illegal arguments in JSON input, msg: {}", iae.getMessage());
         }
         return ok(root).build();
     }
@@ -153,24 +163,26 @@
 
             cm = new ControlMetric(ControlMetricType.MEMORY_USED_RATIO,
                     new MetricValue.Builder().load(memUsedRatio).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.MEMORY_USED_RATIO, memUsedRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE_RATIO,
                     new MetricValue.Builder().load(memFreeRatio).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.MEMORY_FREE_RATIO, memFreeRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_USED,
                     new MetricValue.Builder().load(memUsed).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.MEMORY_USED, memUsed);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE,
                     new MetricValue.Builder().load(memFree).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.MEMORY_FREE, memFree);
 
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
-        } catch (IllegalArgumentException iae) {
-            log.error("[RAM] Illegal arguments in JSON input, msg: {}", iae.getMessage());
         }
         return ok(root).build();
     }
@@ -198,21 +210,24 @@
                 JsonNode resourceName = node.get("resourceName");
                 nullIsIllegal(resourceName, INVALID_RESOURCE_NAME);
 
+                SystemMetricsAggregator diskAggr = new SystemMetricsAggregator(metricsService,
+                        Optional.of(resourceName.asText()), "disk");
+
                 long readBytes = nullIsIllegal(node.get("readBytes").asLong(), INVALID_REQUEST);
                 long writeBytes = nullIsIllegal(node.get("writeBytes").asLong(), INVALID_REQUEST);
 
                 cm = new ControlMetric(ControlMetricType.DISK_READ_BYTES,
                         new MetricValue.Builder().load(readBytes).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                diskAggr.increment(ControlMetricType.DISK_READ_BYTES, readBytes);
 
                 cm = new ControlMetric(ControlMetricType.DISK_WRITE_BYTES,
                         new MetricValue.Builder().load(writeBytes).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                diskAggr.increment(ControlMetricType.DISK_WRITE_BYTES, writeBytes);
             }
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
-        } catch (IllegalArgumentException iae) {
-            log.error("[DISK] Illegal arguments in JSON input, msg: {}", iae.getMessage());
         }
         return ok(root).build();
     }
@@ -241,6 +256,9 @@
                 JsonNode resourceName = node.get("resourceName");
                 nullIsIllegal(resourceName, INVALID_RESOURCE_NAME);
 
+                SystemMetricsAggregator networkAggr = new SystemMetricsAggregator(metricsService,
+                        Optional.of(resourceName.asText()), "network");
+
                 long inBytes = nullIsIllegal(node.get("incomingBytes").asLong(), INVALID_REQUEST);
                 long outBytes = nullIsIllegal(node.get("outgoingBytes").asLong(), INVALID_REQUEST);
                 long inPackets = nullIsIllegal(node.get("incomingPackets").asLong(), INVALID_REQUEST);
@@ -248,19 +266,23 @@
 
                 cm = new ControlMetric(ControlMetricType.NW_INCOMING_BYTES,
                         new MetricValue.Builder().load(inBytes).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                networkAggr.increment(ControlMetricType.NW_INCOMING_BYTES, inBytes);
 
                 cm = new ControlMetric(ControlMetricType.NW_OUTGOING_BYTES,
                         new MetricValue.Builder().load(outBytes).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                networkAggr.increment(ControlMetricType.NW_OUTGOING_BYTES, outBytes);
 
                 cm = new ControlMetric(ControlMetricType.NW_INCOMING_PACKETS,
                         new MetricValue.Builder().load(inPackets).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                networkAggr.increment(ControlMetricType.NW_INCOMING_PACKETS, inPackets);
 
                 cm = new ControlMetric(ControlMetricType.NW_OUTGOING_PACKETS,
                         new MetricValue.Builder().load(outPackets).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                networkAggr.increment(ControlMetricType.NW_OUTGOING_PACKETS, outPackets);
             }
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
diff --git a/apps/cpman/app/src/test/java/org/onosproject/cpman/rest/ControlMetricsCollectorResourceTest.java b/apps/cpman/app/src/test/java/org/onosproject/cpman/rest/ControlMetricsCollectorResourceTest.java
index a0ef91c..e4fa11f 100644
--- a/apps/cpman/app/src/test/java/org/onosproject/cpman/rest/ControlMetricsCollectorResourceTest.java
+++ b/apps/cpman/app/src/test/java/org/onosproject/cpman/rest/ControlMetricsCollectorResourceTest.java
@@ -15,28 +15,42 @@
  */
 package org.onosproject.cpman.rest;
 
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.util.Optional;
-
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.junit.Before;
 import org.junit.Test;
+import org.onlab.metrics.MetricsComponent;
+import org.onlab.metrics.MetricsFeature;
+import org.onlab.metrics.MetricsService;
 import org.onlab.osgi.ServiceDirectory;
 import org.onlab.osgi.TestServiceDirectory;
 import org.onlab.rest.BaseResource;
 import org.onosproject.cpman.ControlPlaneMonitorService;
 import org.onosproject.cpman.SystemInfo;
 import org.onosproject.cpman.impl.SystemInfoFactory;
+import org.onosproject.cpman.impl.SystemMetricsAggregator;
 import org.onosproject.net.DeviceId;
 import org.onosproject.rest.resources.ResourceTest;
 
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
 import static org.easymock.EasyMock.anyInt;
+import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.createMock;
@@ -53,6 +67,10 @@
 
     final ControlPlaneMonitorService mockControlPlaneMonitorService =
                                      createMock(ControlPlaneMonitorService.class);
+    final MetricsService mockMetricsService = new MockMetricsService();
+    final MetricsComponent mockMetricsComponent = createMock(MetricsComponent.class);
+    final SystemMetricsAggregator mockAggregator = createMock(SystemMetricsAggregator.class);
+
 
     private static final String PREFIX = "collector";
 
@@ -70,7 +88,8 @@
     public void setUpTest() {
         ServiceDirectory testDirectory =
                 new TestServiceDirectory()
-                        .add(ControlPlaneMonitorService.class, mockControlPlaneMonitorService);
+                        .add(ControlPlaneMonitorService.class, mockControlPlaneMonitorService)
+                        .add(MetricsService.class, mockMetricsService);
         BaseResource.setServiceDirectory(testDirectory);
     }
 
@@ -83,6 +102,11 @@
                 (Optional<DeviceId>) anyObject());
         expectLastCall().times(5);
         replay(mockControlPlaneMonitorService);
+
+        mockAggregator.increment(anyObject(), anyLong());
+        expectLastCall();
+        replay(mockAggregator);
+
         basePostTest("cpu-metrics-post.json", PREFIX + "/cpu_metrics");
     }
 
@@ -95,6 +119,7 @@
                 (Optional<DeviceId>) anyObject());
         expectLastCall().times(4);
         replay(mockControlPlaneMonitorService);
+
         basePostTest("memory-metrics-post.json", PREFIX + "/memory_metrics");
     }
 
@@ -106,6 +131,7 @@
         mockControlPlaneMonitorService.updateMetric(anyObject(), anyInt(), anyString());
         expectLastCall().times(4);
         replay(mockControlPlaneMonitorService);
+
         basePostTest("disk-metrics-post.json", PREFIX + "/disk_metrics");
     }
 
@@ -117,6 +143,7 @@
         mockControlPlaneMonitorService.updateMetric(anyObject(), anyInt(), anyString());
         expectLastCall().times(8);
         replay(mockControlPlaneMonitorService);
+
         basePostTest("network-metrics-post.json", PREFIX + "/network_metrics");
     }
 
@@ -147,4 +174,90 @@
         Response response = baseTest(jsonFile, path);
         assertThat(response.getStatus(), is(HttpURLConnection.HTTP_OK));
     }
+
+    private class MockMetricsService implements MetricsService {
+
+        @Override
+        public MetricsComponent registerComponent(String name) {
+            MetricsComponent metricsComponent = new MetricsComponent(name);
+            return metricsComponent;
+        }
+
+        @Override
+        public MetricRegistry getMetricRegistry() {
+            return null;
+        }
+
+        @Override
+        public Counter createCounter(MetricsComponent component, MetricsFeature feature,
+                                     String metricName) {
+            return null;
+        }
+
+        @Override
+        public Histogram createHistogram(MetricsComponent component,
+                                         MetricsFeature feature, String metricName) {
+            return null;
+        }
+
+        @Override
+        public Timer createTimer(MetricsComponent component,
+                                 MetricsFeature feature, String metricName) {
+            return null;
+        }
+
+        @Override
+        public Meter createMeter(MetricsComponent component,
+                                 MetricsFeature feature, String metricName) {
+            return new Meter();
+        }
+
+        @Override
+        public <T extends Metric> T registerMetric(MetricsComponent component,
+                                                   MetricsFeature feature,
+                                                   String metricName, T metric) {
+            return null;
+        }
+
+        @Override
+        public boolean removeMetric(MetricsComponent component,
+                                    MetricsFeature feature, String metricName) {
+            return false;
+        }
+
+        @Override
+        public Map<String, Timer> getTimers(MetricFilter filter) {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public Map<String, Gauge> getGauges(MetricFilter filter) {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public Map<String, Counter> getCounters(MetricFilter filter) {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public Map<String, Meter> getMeters(MetricFilter filter) {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public Map<String, Histogram> getHistograms(MetricFilter filter) {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public Map<String, Metric> getMetrics() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public void removeMatching(MetricFilter filter) {
+
+        }
+    }
 }
diff --git a/utils/misc/src/main/java/org/onlab/metrics/MetricsComponent.java b/utils/misc/src/main/java/org/onlab/metrics/MetricsComponent.java
index cc8fe83..9103921 100644
--- a/utils/misc/src/main/java/org/onlab/metrics/MetricsComponent.java
+++ b/utils/misc/src/main/java/org/onlab/metrics/MetricsComponent.java
@@ -35,7 +35,7 @@
      *
      * @param newName name of the component
      */
-    MetricsComponent(final String newName) {
+    public MetricsComponent(final String newName) {
         name = newName;
     }