Enable to query the control metrics from remote node

Change-Id: Ifef1c6eafd7cc79ed99be51f7faa26d97aeb2f67
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/CPManCodecRegistrator.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/CPManCodecRegistrator.java
index aa7709c..5afad2e 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/CPManCodecRegistrator.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/CPManCodecRegistrator.java
@@ -21,8 +21,8 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onosproject.codec.CodecService;
-import org.onosproject.cpman.ControlLoad;
-import org.onosproject.cpman.codec.ControlLoadCodec;
+import org.onosproject.cpman.ControlLoadSnapshot;
+import org.onosproject.cpman.codec.ControlLoadSnapshotCodec;
 import org.slf4j.Logger;
 
 import static org.slf4j.LoggerFactory.getLogger;
@@ -40,14 +40,14 @@
 
     @Activate
     public void activate() {
-        codecService.registerCodec(ControlLoad.class, new ControlLoadCodec());
+        codecService.registerCodec(ControlLoadSnapshot.class, new ControlLoadSnapshotCodec());
 
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        codecService.unregisterCodec(ControlLoad.class);
+        codecService.unregisterCodec(ControlLoadSnapshot.class);
 
         log.info("Stopped");
     }
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/ControlMetricsWebResource.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/ControlMetricsWebResource.java
index a5d0baf..5e985b0 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/ControlMetricsWebResource.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/ControlMetricsWebResource.java
@@ -18,9 +18,10 @@
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.lang3.StringUtils;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cpman.ControlLoad;
+import org.onosproject.cpman.ControlLoadSnapshot;
 import org.onosproject.cpman.ControlMetricType;
 import org.onosproject.cpman.ControlPlaneMonitorService;
 import org.onosproject.net.DeviceId;
@@ -34,6 +35,8 @@
 import javax.ws.rs.core.Response;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
 import static org.onosproject.cpman.ControlResource.CPU_METRICS;
@@ -55,6 +58,7 @@
     private final ClusterService clusterService = get(ClusterService.class);
     private final NodeId localNodeId = clusterService.getLocalNode().id();
     private final ObjectNode root = mapper().createObjectNode();
+    private static final long TIMEOUT_MILLIS = 1000;
 
     /**
      * Returns control message metrics of all devices.
@@ -248,31 +252,51 @@
         if (name == null && did == null) {
             typeSet.forEach(type -> {
                 ObjectNode metricNode = mapper().createObjectNode();
-                ControlLoad load = service.getLocalLoad(type, Optional.ofNullable(null));
-                if (load != null) {
-                    metricNode.set(toCamelCase(type.toString(), true), codec(ControlLoad.class)
-                            .encode(service.getLocalLoad(type, Optional.ofNullable(null)), this));
-                    metricsNode.add(metricNode);
+                CompletableFuture<ControlLoadSnapshot> cf =
+                        service.getLoad(nodeId, type, Optional.empty());
+
+                if (cf != null) {
+                    ControlLoadSnapshot cmr =
+                            Tools.futureGetOrElse(cf, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, null);
+
+                    if (cmr != null) {
+                        metricNode.set(toCamelCase(type.toString(), true),
+                                codec(ControlLoadSnapshot.class).encode(cmr, this));
+                        metricsNode.add(metricNode);
+                    }
                 }
             });
         } else if (name == null) {
             typeSet.forEach(type -> {
                 ObjectNode metricNode = mapper().createObjectNode();
-                ControlLoad load = service.getLocalLoad(type, Optional.of(did));
-                if (load != null) {
-                    metricNode.set(toCamelCase(type.toString(), true),
-                            codec(ControlLoad.class).encode(load, this));
-                    metricsNode.add(metricNode);
+                CompletableFuture<ControlLoadSnapshot> cf =
+                        service.getLoad(nodeId, type, Optional.of(did));
+
+                if (cf != null) {
+                    ControlLoadSnapshot cmr =
+                            Tools.futureGetOrElse(cf, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, null);
+                    if (cmr != null) {
+                        metricNode.set(toCamelCase(type.toString(), true),
+                                codec(ControlLoadSnapshot.class).encode(cmr, this));
+                        metricsNode.add(metricNode);
+                    }
                 }
+
             });
         } else if (did == null) {
             typeSet.forEach(type -> {
                 ObjectNode metricNode = mapper().createObjectNode();
-                ControlLoad load = service.getLocalLoad(type, name);
-                if (load != null) {
-                    metricNode.set(toCamelCase(type.toString(), true),
-                            codec(ControlLoad.class).encode(load, this));
-                    metricsNode.add(metricNode);
+                CompletableFuture<ControlLoadSnapshot> cf =
+                        service.getLoad(nodeId, type, name);
+
+                if (cf != null) {
+                    ControlLoadSnapshot cmr =
+                            Tools.futureGetOrElse(cf, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, null);
+                    if (cmr != null) {
+                        metricNode.set(toCamelCase(type.toString(), true),
+                                codec(ControlLoadSnapshot.class).encode(cmr, this));
+                        metricsNode.add(metricNode);
+                    }
                 }
             });
         }
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
index c96d1d9..5dd9413 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
@@ -106,31 +106,31 @@
             long cpuIdleTime = nullIsIllegal(jsonTree.get("cpuIdleTime").asLong(), INVALID_REQUEST);
 
             aggregator.setMetricsService(metricsService);
-            aggregator.addMetrics(Optional.ofNullable(null), SYSTEM_TYPE);
+            aggregator.addMetrics(Optional.empty(), SYSTEM_TYPE);
 
             cm = new ControlMetric(ControlMetricType.CPU_LOAD,
                     new MetricValue.Builder().load(cpuLoad).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.CPU_LOAD, cpuLoad);
 
             cm = new ControlMetric(ControlMetricType.TOTAL_CPU_TIME,
                     new MetricValue.Builder().load(totalCpuTime).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.TOTAL_CPU_TIME, totalCpuTime);
 
             cm = new ControlMetric(ControlMetricType.SYS_CPU_TIME,
                     new MetricValue.Builder().load(sysCpuTime).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.SYS_CPU_TIME, sysCpuTime);
 
             cm = new ControlMetric(ControlMetricType.USER_CPU_TIME,
                     new MetricValue.Builder().load(userCpuTime).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.USER_CPU_TIME, userCpuTime);
 
             cm = new ControlMetric(ControlMetricType.CPU_IDLE_TIME,
                     new MetricValue.Builder().load(cpuIdleTime).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.CPU_IDLE_TIME, cpuIdleTime);
 
         } catch (IOException e) {
@@ -167,26 +167,26 @@
             long memFreeRatio = memTotal == 0L ? 0L : (memFree * PERCENT_CONSTANT) / memTotal;
 
             aggregator.setMetricsService(metricsService);
-            aggregator.addMetrics(Optional.ofNullable(null), SYSTEM_TYPE);
+            aggregator.addMetrics(Optional.empty(), SYSTEM_TYPE);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_USED_RATIO,
                     new MetricValue.Builder().load(memUsedRatio).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.MEMORY_USED_RATIO, memUsedRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE_RATIO,
                     new MetricValue.Builder().load(memFreeRatio).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.MEMORY_FREE_RATIO, memFreeRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_USED,
                     new MetricValue.Builder().load(memUsed).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.MEMORY_USED, memUsed);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE,
                     new MetricValue.Builder().load(memFree).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.MEMORY_FREE, memFree);
 
         } catch (IOException e) {