Aggregate system metrics using metrics service

Change-Id: I617fa21973b7e01b92f311a6fa5687e1f0f870c2
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
index 3e8c57b..8b4244f 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
@@ -19,6 +19,7 @@
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.lang3.StringUtils;
+import org.onlab.metrics.MetricsService;
 import org.onosproject.cpman.ControlMetric;
 import org.onosproject.cpman.ControlMetricType;
 import org.onosproject.cpman.ControlPlaneMonitorService;
@@ -26,6 +27,7 @@
 import org.onosproject.cpman.MetricValue;
 import org.onosproject.cpman.SystemInfo;
 import org.onosproject.cpman.impl.DefaultSystemInfo;
+import org.onosproject.cpman.impl.SystemMetricsAggregator;
 import org.onosproject.cpman.impl.SystemInfoFactory;
 import org.onosproject.rest.AbstractWebResource;
 import org.slf4j.Logger;
@@ -53,7 +55,9 @@
 public class SystemMetricsCollectorWebResource extends AbstractWebResource {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private final ControlPlaneMonitorService service = get(ControlPlaneMonitorService.class);
+    private final ControlPlaneMonitorService monitorService = get(ControlPlaneMonitorService.class);
+    private final MetricsService metricsService = get(MetricsService.class);
+
     private static final int UPDATE_INTERVAL_IN_MINUTE = 1;
     private static final String INVALID_SYSTEM_SPECS = "Invalid system specifications";
     private static final String INVALID_RESOURCE_NAME = "Invalid resource name";
@@ -68,6 +72,9 @@
             .stream().map(type -> toCamelCase(type.toString(), true))
             .collect(Collectors.toSet());
 
+    private SystemMetricsAggregator systemAggr =
+            new SystemMetricsAggregator(metricsService, Optional.ofNullable(null), "system");
+
     /**
      * Collects CPU metrics.
      *
@@ -98,28 +105,31 @@
 
             cm = new ControlMetric(ControlMetricType.CPU_LOAD,
                     new MetricValue.Builder().load(cpuLoad).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.CPU_LOAD, cpuLoad);
 
             cm = new ControlMetric(ControlMetricType.TOTAL_CPU_TIME,
                     new MetricValue.Builder().load(totalCpuTime).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.TOTAL_CPU_TIME, totalCpuTime);
 
             cm = new ControlMetric(ControlMetricType.SYS_CPU_TIME,
                     new MetricValue.Builder().load(sysCpuTime).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.SYS_CPU_TIME, sysCpuTime);
 
             cm = new ControlMetric(ControlMetricType.USER_CPU_TIME,
                     new MetricValue.Builder().load(userCpuTime).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.USER_CPU_TIME, userCpuTime);
 
             cm = new ControlMetric(ControlMetricType.CPU_IDLE_TIME,
                     new MetricValue.Builder().load(cpuIdleTime).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.CPU_IDLE_TIME, cpuIdleTime);
 
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
-        } catch (IllegalArgumentException iae) {
-            log.error("[CPU] Illegal arguments in JSON input, msg: {}", iae.getMessage());
         }
         return ok(root).build();
     }
@@ -153,24 +163,26 @@
 
             cm = new ControlMetric(ControlMetricType.MEMORY_USED_RATIO,
                     new MetricValue.Builder().load(memUsedRatio).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.MEMORY_USED_RATIO, memUsedRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE_RATIO,
                     new MetricValue.Builder().load(memFreeRatio).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.MEMORY_FREE_RATIO, memFreeRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_USED,
                     new MetricValue.Builder().load(memUsed).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.MEMORY_USED, memUsed);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE,
                     new MetricValue.Builder().load(memFree).add());
-            service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            systemAggr.increment(ControlMetricType.MEMORY_FREE, memFree);
 
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
-        } catch (IllegalArgumentException iae) {
-            log.error("[RAM] Illegal arguments in JSON input, msg: {}", iae.getMessage());
         }
         return ok(root).build();
     }
@@ -198,21 +210,24 @@
                 JsonNode resourceName = node.get("resourceName");
                 nullIsIllegal(resourceName, INVALID_RESOURCE_NAME);
 
+                SystemMetricsAggregator diskAggr = new SystemMetricsAggregator(metricsService,
+                        Optional.of(resourceName.asText()), "disk");
+
                 long readBytes = nullIsIllegal(node.get("readBytes").asLong(), INVALID_REQUEST);
                 long writeBytes = nullIsIllegal(node.get("writeBytes").asLong(), INVALID_REQUEST);
 
                 cm = new ControlMetric(ControlMetricType.DISK_READ_BYTES,
                         new MetricValue.Builder().load(readBytes).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                diskAggr.increment(ControlMetricType.DISK_READ_BYTES, readBytes);
 
                 cm = new ControlMetric(ControlMetricType.DISK_WRITE_BYTES,
                         new MetricValue.Builder().load(writeBytes).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                diskAggr.increment(ControlMetricType.DISK_WRITE_BYTES, writeBytes);
             }
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
-        } catch (IllegalArgumentException iae) {
-            log.error("[DISK] Illegal arguments in JSON input, msg: {}", iae.getMessage());
         }
         return ok(root).build();
     }
@@ -241,6 +256,9 @@
                 JsonNode resourceName = node.get("resourceName");
                 nullIsIllegal(resourceName, INVALID_RESOURCE_NAME);
 
+                SystemMetricsAggregator networkAggr = new SystemMetricsAggregator(metricsService,
+                        Optional.of(resourceName.asText()), "network");
+
                 long inBytes = nullIsIllegal(node.get("incomingBytes").asLong(), INVALID_REQUEST);
                 long outBytes = nullIsIllegal(node.get("outgoingBytes").asLong(), INVALID_REQUEST);
                 long inPackets = nullIsIllegal(node.get("incomingPackets").asLong(), INVALID_REQUEST);
@@ -248,19 +266,23 @@
 
                 cm = new ControlMetric(ControlMetricType.NW_INCOMING_BYTES,
                         new MetricValue.Builder().load(inBytes).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                networkAggr.increment(ControlMetricType.NW_INCOMING_BYTES, inBytes);
 
                 cm = new ControlMetric(ControlMetricType.NW_OUTGOING_BYTES,
                         new MetricValue.Builder().load(outBytes).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                networkAggr.increment(ControlMetricType.NW_OUTGOING_BYTES, outBytes);
 
                 cm = new ControlMetric(ControlMetricType.NW_INCOMING_PACKETS,
                         new MetricValue.Builder().load(inPackets).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                networkAggr.increment(ControlMetricType.NW_INCOMING_PACKETS, inPackets);
 
                 cm = new ControlMetric(ControlMetricType.NW_OUTGOING_PACKETS,
                         new MetricValue.Builder().load(outPackets).add());
-                service.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
+                networkAggr.increment(ControlMetricType.NW_OUTGOING_PACKETS, outPackets);
             }
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());