Notify all metric reporters when new types of system metric added

Change-Id: I307be0cb68bdc7fc3c75212c3fac4390fb9391a9
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/SystemMetricsAggregator.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/SystemMetricsAggregator.java
index f3c7761..19887a1 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/SystemMetricsAggregator.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/SystemMetricsAggregator.java
@@ -34,7 +34,7 @@
 /**
  * Aggregate system metrics.
  */
-public class SystemMetricsAggregator {
+public final class SystemMetricsAggregator {
 
     private final Logger log = getLogger(getClass());
 
@@ -42,40 +42,108 @@
     private static final String DEFAULT_METER_SUFFIX = "rate";
     private static final String DISK_RESOURCE_TYPE = "disk";
     private static final String NETWORK_RESOURCE_TYPE = "network";
-    private final Map<ControlMetricType, Meter> meterMap = Maps.newHashMap();
-    private final Set<ControlMetricType> metricTypeSet = Sets.newHashSet();
+    private final Map<ControlMetricType, Meter> systemMap = Maps.newHashMap();
+    private final Map<String, Map<ControlMetricType, Meter>> diskMap = Maps.newHashMap();
+    private final Map<String, Map<ControlMetricType, Meter>> networkMap = Maps.newHashMap();
 
-    public SystemMetricsAggregator(MetricsService metricsService, Optional<String> resName, String resType) {
-        String resourceName = resName.isPresent() ? resName.get() : DEFAULT_RESOURCE_NAME;
-        MetricsComponent mc = metricsService.registerComponent(resourceName);
+    private MetricsService metricsService;
 
-        if (resName.isPresent()) {
-            if (DISK_RESOURCE_TYPE.equals(resType)) {
-                metricTypeSet.addAll(ControlResource.DISK_METRICS);
-            } else if (NETWORK_RESOURCE_TYPE.equals(resType)) {
-                metricTypeSet.addAll(ControlResource.NETWORK_METRICS);
-            } else {
-                log.warn("Not valid resource type {}", resType);
-            }
-        } else {
-            metricTypeSet.addAll(ControlResource.MEMORY_METRICS);
-            metricTypeSet.addAll(ControlResource.CPU_METRICS);
-        }
-
-        metricTypeSet.forEach(type -> {
-            MetricsFeature metricsFeature = mc.registerFeature(type.toString());
-            Meter meter = metricsService.createMeter(mc, metricsFeature, DEFAULT_METER_SUFFIX);
-            meterMap.putIfAbsent(type, meter);
-        });
+    public static SystemMetricsAggregator getInstance() {
+        return SingletonHelper.INSTANCE;
     }
 
     /**
-     * Increments metric value.
+     * Configures metric services.
      *
-     * @param type metric type
+     * @param service metrics service
+     */
+    public void setMetricsService(MetricsService service) {
+
+        metricsService = service;
+    }
+
+    /**
+     * Increments system metric value.
+     *
+     * @param type  metric type
      * @param value metric value
      */
     public void increment(ControlMetricType type, long value) {
-        meterMap.get(type).mark(value);
+        systemMap.get(type).mark(value);
+    }
+
+    /**
+     * Increments disk or network metric value.
+     *
+     * @param resourceName resource name
+     * @param resourceType resource type
+     * @param type         control metric type
+     * @param value        metric value
+     */
+    public void increment(String resourceName, String resourceType, ControlMetricType type, long value) {
+        if (DISK_RESOURCE_TYPE.equals(resourceType) && diskMap.containsKey(resourceName)) {
+            diskMap.get(resourceName).get(type).mark(value);
+        }
+
+        if (NETWORK_RESOURCE_TYPE.equals(resourceType) && networkMap.containsKey(resourceName)) {
+            networkMap.get(resourceName).get(type).mark(value);
+        }
+    }
+
+    /**
+     * Adds a set of new monitoring metric types.
+     *
+     * @param optResourceName optional resource name, null denotes system metric
+     * @param resType         resource type
+     */
+    public void addMetrics(Optional<String> optResourceName, String resType) {
+        Set<ControlMetricType> metricTypeSet = Sets.newHashSet();
+        String resourceName = optResourceName.isPresent() ?
+                optResourceName.get() : DEFAULT_RESOURCE_NAME;
+
+        MetricsComponent metricsComponent = metricsService.registerComponent(resourceName);
+
+        if (optResourceName.isPresent()) {
+            if (!diskMap.containsKey(resourceName) && DISK_RESOURCE_TYPE.equals(resType)) {
+                metricTypeSet.addAll(ControlResource.DISK_METRICS);
+                diskMap.putIfAbsent(resourceName,
+                        getMeterMap(metricTypeSet, metricsComponent, metricsService));
+                metricsService.notifyReporters();
+            } else if (!networkMap.containsKey(resourceName) && NETWORK_RESOURCE_TYPE.equals(resType)) {
+                metricTypeSet.addAll(ControlResource.NETWORK_METRICS);
+                networkMap.putIfAbsent(resourceName,
+                        getMeterMap(metricTypeSet, metricsComponent, metricsService));
+                metricsService.notifyReporters();
+            } else {
+                return;
+            }
+        } else {
+            if (systemMap.isEmpty()) {
+                metricTypeSet.addAll(ControlResource.MEMORY_METRICS);
+                metricTypeSet.addAll(ControlResource.CPU_METRICS);
+
+                systemMap.putAll(getMeterMap(metricTypeSet, metricsComponent, metricsService));
+                metricsService.notifyReporters();
+            }
+        }
+    }
+
+    private Map<ControlMetricType, Meter> getMeterMap(Set<ControlMetricType> types,
+                                                      MetricsComponent component,
+                                                      MetricsService service) {
+        Map<ControlMetricType, Meter> meterMap = Maps.newHashMap();
+        types.forEach(type -> {
+            MetricsFeature metricsFeature = component.registerFeature(type.toString());
+            Meter meter = service.createMeter(component, metricsFeature, DEFAULT_METER_SUFFIX);
+            meterMap.putIfAbsent(type, meter);
+        });
+        return meterMap;
+    }
+
+    private SystemMetricsAggregator() {
+    }
+
+    private static class SingletonHelper {
+        private static final SystemMetricsAggregator INSTANCE = new SystemMetricsAggregator();
     }
 }
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
index 6c565ff..c96d1d9 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
@@ -63,6 +63,9 @@
     private static final String INVALID_RESOURCE_NAME = "Invalid resource name";
     private static final String INVALID_REQUEST = "Invalid request";
     private static final int PERCENT_CONSTANT = 100;
+    private static final String SYSTEM_TYPE = "system";
+    private static final String DISK_TYPE = "disk";
+    private static final String NETWORK_TYPE = "network";
 
     private static final Set<String> MEMORY_FIELD_SET = ControlResource.MEMORY_METRICS
             .stream().map(type -> toCamelCase(type.toString(), true))
@@ -72,8 +75,7 @@
             .stream().map(type -> toCamelCase(type.toString(), true))
             .collect(Collectors.toSet());
 
-    private SystemMetricsAggregator systemAggr =
-            new SystemMetricsAggregator(metricsService, Optional.ofNullable(null), "system");
+    private SystemMetricsAggregator aggregator = SystemMetricsAggregator.getInstance();
 
     /**
      * Collects CPU metrics.
@@ -103,30 +105,33 @@
             long userCpuTime = nullIsIllegal(jsonTree.get("userCpuTime").asLong(), INVALID_REQUEST);
             long cpuIdleTime = nullIsIllegal(jsonTree.get("cpuIdleTime").asLong(), INVALID_REQUEST);
 
+            aggregator.setMetricsService(metricsService);
+            aggregator.addMetrics(Optional.ofNullable(null), SYSTEM_TYPE);
+
             cm = new ControlMetric(ControlMetricType.CPU_LOAD,
                     new MetricValue.Builder().load(cpuLoad).add());
             monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
-            systemAggr.increment(ControlMetricType.CPU_LOAD, cpuLoad);
+            aggregator.increment(ControlMetricType.CPU_LOAD, cpuLoad);
 
             cm = new ControlMetric(ControlMetricType.TOTAL_CPU_TIME,
                     new MetricValue.Builder().load(totalCpuTime).add());
             monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
-            systemAggr.increment(ControlMetricType.TOTAL_CPU_TIME, totalCpuTime);
+            aggregator.increment(ControlMetricType.TOTAL_CPU_TIME, totalCpuTime);
 
             cm = new ControlMetric(ControlMetricType.SYS_CPU_TIME,
                     new MetricValue.Builder().load(sysCpuTime).add());
             monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
-            systemAggr.increment(ControlMetricType.SYS_CPU_TIME, sysCpuTime);
+            aggregator.increment(ControlMetricType.SYS_CPU_TIME, sysCpuTime);
 
             cm = new ControlMetric(ControlMetricType.USER_CPU_TIME,
                     new MetricValue.Builder().load(userCpuTime).add());
             monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
-            systemAggr.increment(ControlMetricType.USER_CPU_TIME, userCpuTime);
+            aggregator.increment(ControlMetricType.USER_CPU_TIME, userCpuTime);
 
             cm = new ControlMetric(ControlMetricType.CPU_IDLE_TIME,
                     new MetricValue.Builder().load(cpuIdleTime).add());
             monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
-            systemAggr.increment(ControlMetricType.CPU_IDLE_TIME, cpuIdleTime);
+            aggregator.increment(ControlMetricType.CPU_IDLE_TIME, cpuIdleTime);
 
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
@@ -161,25 +166,28 @@
             long memUsedRatio = memTotal == 0L ? 0L : (memUsed * PERCENT_CONSTANT) / memTotal;
             long memFreeRatio = memTotal == 0L ? 0L : (memFree * PERCENT_CONSTANT) / memTotal;
 
+            aggregator.setMetricsService(metricsService);
+            aggregator.addMetrics(Optional.ofNullable(null), SYSTEM_TYPE);
+
             cm = new ControlMetric(ControlMetricType.MEMORY_USED_RATIO,
                     new MetricValue.Builder().load(memUsedRatio).add());
             monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
-            systemAggr.increment(ControlMetricType.MEMORY_USED_RATIO, memUsedRatio);
+            aggregator.increment(ControlMetricType.MEMORY_USED_RATIO, memUsedRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE_RATIO,
                     new MetricValue.Builder().load(memFreeRatio).add());
             monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
-            systemAggr.increment(ControlMetricType.MEMORY_FREE_RATIO, memFreeRatio);
+            aggregator.increment(ControlMetricType.MEMORY_FREE_RATIO, memFreeRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_USED,
                     new MetricValue.Builder().load(memUsed).add());
             monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
-            systemAggr.increment(ControlMetricType.MEMORY_USED, memUsed);
+            aggregator.increment(ControlMetricType.MEMORY_USED, memUsed);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE,
                     new MetricValue.Builder().load(memFree).add());
             monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
-            systemAggr.increment(ControlMetricType.MEMORY_FREE, memFree);
+            aggregator.increment(ControlMetricType.MEMORY_FREE, memFree);
 
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
@@ -204,14 +212,15 @@
         try {
             ObjectNode jsonTree = (ObjectNode) mapper().readTree(stream);
             ArrayNode diskRes =
-                    jsonTree.get("disks") == null ? mapper().createArrayNode() : (ArrayNode) jsonTree.get("disks");
+                    jsonTree.get("disks") == null ?
+                            mapper().createArrayNode() : (ArrayNode) jsonTree.get("disks");
 
             for (JsonNode node : diskRes) {
                 JsonNode resourceName = node.get("resourceName");
                 nullIsIllegal(resourceName, INVALID_RESOURCE_NAME);
 
-                SystemMetricsAggregator diskAggr = new SystemMetricsAggregator(metricsService,
-                        Optional.of(resourceName.asText()), "disk");
+                aggregator.setMetricsService(metricsService);
+                aggregator.addMetrics(Optional.of(resourceName.asText()), DISK_TYPE);
 
                 long readBytes = nullIsIllegal(node.get("readBytes").asLong(), INVALID_REQUEST);
                 long writeBytes = nullIsIllegal(node.get("writeBytes").asLong(), INVALID_REQUEST);
@@ -219,12 +228,13 @@
                 cm = new ControlMetric(ControlMetricType.DISK_READ_BYTES,
                         new MetricValue.Builder().load(readBytes).add());
                 monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
-                diskAggr.increment(ControlMetricType.DISK_READ_BYTES, readBytes);
-
+                aggregator.increment(resourceName.asText(), DISK_TYPE,
+                        ControlMetricType.DISK_READ_BYTES, readBytes);
                 cm = new ControlMetric(ControlMetricType.DISK_WRITE_BYTES,
                         new MetricValue.Builder().load(writeBytes).add());
                 monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
-                diskAggr.increment(ControlMetricType.DISK_WRITE_BYTES, writeBytes);
+                aggregator.increment(resourceName.asText(), DISK_TYPE,
+                        ControlMetricType.DISK_WRITE_BYTES, writeBytes);
             }
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
@@ -256,8 +266,8 @@
                 JsonNode resourceName = node.get("resourceName");
                 nullIsIllegal(resourceName, INVALID_RESOURCE_NAME);
 
-                SystemMetricsAggregator networkAggr = new SystemMetricsAggregator(metricsService,
-                        Optional.of(resourceName.asText()), "network");
+                aggregator.setMetricsService(metricsService);
+                aggregator.addMetrics(Optional.of(resourceName.asText()), NETWORK_TYPE);
 
                 long inBytes = nullIsIllegal(node.get("incomingBytes").asLong(), INVALID_REQUEST);
                 long outBytes = nullIsIllegal(node.get("outgoingBytes").asLong(), INVALID_REQUEST);
@@ -267,22 +277,26 @@
                 cm = new ControlMetric(ControlMetricType.NW_INCOMING_BYTES,
                         new MetricValue.Builder().load(inBytes).add());
                 monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
-                networkAggr.increment(ControlMetricType.NW_INCOMING_BYTES, inBytes);
+                aggregator.increment(resourceName.asText(), NETWORK_TYPE,
+                        ControlMetricType.NW_INCOMING_BYTES, inBytes);
 
                 cm = new ControlMetric(ControlMetricType.NW_OUTGOING_BYTES,
                         new MetricValue.Builder().load(outBytes).add());
                 monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
-                networkAggr.increment(ControlMetricType.NW_OUTGOING_BYTES, outBytes);
+                aggregator.increment(resourceName.asText(), NETWORK_TYPE,
+                        ControlMetricType.NW_OUTGOING_BYTES, outBytes);
 
                 cm = new ControlMetric(ControlMetricType.NW_INCOMING_PACKETS,
                         new MetricValue.Builder().load(inPackets).add());
                 monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
-                networkAggr.increment(ControlMetricType.NW_INCOMING_PACKETS, inPackets);
+                aggregator.increment(resourceName.asText(), NETWORK_TYPE,
+                        ControlMetricType.NW_INCOMING_PACKETS, inPackets);
 
                 cm = new ControlMetric(ControlMetricType.NW_OUTGOING_PACKETS,
                         new MetricValue.Builder().load(outPackets).add());
                 monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, resourceName.asText());
-                networkAggr.increment(ControlMetricType.NW_OUTGOING_PACKETS, outPackets);
+                aggregator.increment(resourceName.asText(), NETWORK_TYPE,
+                        ControlMetricType.NW_OUTGOING_PACKETS, outPackets);
             }
         } catch (IOException e) {
             throw new IllegalArgumentException(e.getMessage());
diff --git a/apps/cpman/app/src/test/java/org/onosproject/cpman/rest/ControlMetricsCollectorResourceTest.java b/apps/cpman/app/src/test/java/org/onosproject/cpman/rest/ControlMetricsCollectorResourceTest.java
index effb3de..fe89335 100644
--- a/apps/cpman/app/src/test/java/org/onosproject/cpman/rest/ControlMetricsCollectorResourceTest.java
+++ b/apps/cpman/app/src/test/java/org/onosproject/cpman/rest/ControlMetricsCollectorResourceTest.java
@@ -36,7 +36,6 @@
 import org.onosproject.cpman.ControlPlaneMonitorService;
 import org.onosproject.cpman.SystemInfo;
 import org.onosproject.cpman.impl.SystemInfoFactory;
-import org.onosproject.cpman.impl.SystemMetricsAggregator;
 import org.onosproject.net.DeviceId;
 import org.onosproject.rest.resources.ResourceTest;
 
@@ -51,7 +50,6 @@
 import java.util.Optional;
 
 import static org.easymock.EasyMock.anyInt;
-import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.createMock;
@@ -69,9 +67,6 @@
     final ControlPlaneMonitorService mockControlPlaneMonitorService =
                                      createMock(ControlPlaneMonitorService.class);
     final MetricsService mockMetricsService = new MockMetricsService();
-    final MetricsComponent mockMetricsComponent = createMock(MetricsComponent.class);
-    final SystemMetricsAggregator mockAggregator = createMock(SystemMetricsAggregator.class);
-
 
     private static final String PREFIX = "collector";
 
@@ -104,10 +99,6 @@
         expectLastCall().times(5);
         replay(mockControlPlaneMonitorService);
 
-        mockAggregator.increment(anyObject(), anyLong());
-        expectLastCall();
-        replay(mockAggregator);
-
         basePostTest("cpu-metrics-post.json", PREFIX + "/cpu_metrics");
     }