Enable to query the control metrics from remote node

Change-Id: Ifef1c6eafd7cc79ed99be51f7faa26d97aeb2f67
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/cli/ControlMetricsStatsListCommand.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/cli/ControlMetricsStatsListCommand.java
index d4253f9..adf65a9 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/cli/ControlMetricsStatsListCommand.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/cli/ControlMetricsStatsListCommand.java
@@ -17,16 +17,18 @@
 
 import org.apache.karaf.shell.commands.Argument;
 import org.apache.karaf.shell.commands.Command;
+import org.onlab.util.Tools;
 import org.onosproject.cli.AbstractShellCommand;
-import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cpman.ControlLoad;
+import org.onosproject.cpman.ControlLoadSnapshot;
 import org.onosproject.cpman.ControlMetricType;
 import org.onosproject.cpman.ControlPlaneMonitorService;
 import org.onosproject.net.DeviceId;
 
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
 import static org.onosproject.cpman.ControlResource.CPU_METRICS;
@@ -45,20 +47,25 @@
             "averageValue=%d, latestTime=%s";
     private static final String INVALID_TYPE = "Invalid control resource type.";
 
-    @Argument(index = 0, name = "type",
+    private static final long TIMEOUT_MILLIS = 3000;
+
+    @Argument(index = 0, name = "node", description = "ONOS node identifier",
+            required = true, multiValued = false)
+    String node = null;
+
+    @Argument(index = 1, name = "type",
             description = "Resource type (cpu|memory|disk|network|control_message)",
             required = true, multiValued = false)
     String type = null;
 
-    @Argument(index = 1, name = "name", description = "Resource name (or Device Id)",
+    @Argument(index = 2, name = "name", description = "Resource name (or Device Id)",
             required = false, multiValued = false)
     String name = null;
 
     @Override
     protected void execute() {
         ControlPlaneMonitorService service = get(ControlPlaneMonitorService.class);
-        ClusterService clusterService = get(ClusterService.class);
-        NodeId nodeId = clusterService.getLocalNode().id();
+        NodeId nodeId = NodeId.nodeId(node);
         switch (type) {
             case "cpu":
                 printMetricsStats(service, nodeId, CPU_METRICS);
@@ -74,7 +81,8 @@
                 break;
             case "control_message":
                 if (name != null) {
-                    printMetricsStats(service, nodeId, CONTROL_MESSAGE_METRICS, DeviceId.deviceId(name));
+                    printMetricsStats(service, nodeId, CONTROL_MESSAGE_METRICS,
+                            DeviceId.deviceId(name));
                 }
                 break;
             default:
@@ -89,8 +97,8 @@
     }
 
     private void printMetricsStats(ControlPlaneMonitorService service, NodeId nodeId,
-                                   Set<ControlMetricType> typeSet, String name) {
-        printMetricsStats(service, nodeId, typeSet, name, null);
+                                   Set<ControlMetricType> typeSet, String resName) {
+        printMetricsStats(service, nodeId, typeSet, resName, null);
     }
 
     private void printMetricsStats(ControlPlaneMonitorService service, NodeId nodeId,
@@ -99,19 +107,39 @@
     }
 
     private void printMetricsStats(ControlPlaneMonitorService service, NodeId nodeId,
-                                   Set<ControlMetricType> typeSet, String name, DeviceId did) {
-        if (name == null && did == null) {
-            typeSet.forEach(s -> print(s, service.getLocalLoad(s, Optional.ofNullable(null))));
-        } else if (name == null && did != null) {
-            typeSet.forEach(s -> print(s, service.getLocalLoad(s, Optional.of(did))));
-        } else if (name != null && did == null) {
-            typeSet.forEach(s -> print(s, service.getLocalLoad(s, name)));
+                                   Set<ControlMetricType> typeSet, String resName, DeviceId did) {
+        if (resName == null && did == null) {
+            typeSet.forEach(s -> {
+                CompletableFuture<ControlLoadSnapshot> cf =
+                        service.getLoad(nodeId, s, Optional.empty());
+                ControlLoadSnapshot cmr =
+                        Tools.futureGetOrElse(cf, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, null);
+                printRemote(s, cmr);
+            });
+        } else if (resName == null && did != null) {
+            typeSet.forEach(s -> {
+                CompletableFuture<ControlLoadSnapshot> cf =
+                        service.getLoad(nodeId, s, Optional.of(did));
+                ControlLoadSnapshot cmr =
+                        Tools.futureGetOrElse(cf, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, null);
+                printRemote(s, cmr);
+            });
+        } else if (resName != null && did == null) {
+            typeSet.forEach(s -> {
+                CompletableFuture<ControlLoadSnapshot> cf =
+                        service.getLoad(nodeId, s, resName);
+                ControlLoadSnapshot cmr =
+                        Tools.futureGetOrElse(cf, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, null);
+                printRemote(s, cmr);
+            });
         }
     }
 
-    private void print(ControlMetricType type, ControlLoad cl) {
-        if (cl != null) {
-            print(FMT, type.toString(), cl.latest(), cl.average(), cl.time());
+    private void printRemote(ControlMetricType cmType, ControlLoadSnapshot cls) {
+        if (cls != null) {
+            print(FMT, cmType.toString(), cls.latest(), cls.average(), cls.time());
+        } else {
+            print("Failed to retrieve metric value for type {}", cmType.toString());
         }
     }
 }
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/codec/ControlLoadCodec.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/codec/ControlLoadSnapshotCodec.java
similarity index 70%
rename from apps/cpman/app/src/main/java/org/onosproject/cpman/codec/ControlLoadCodec.java
rename to apps/cpman/app/src/main/java/org/onosproject/cpman/codec/ControlLoadSnapshotCodec.java
index fdd080b..7caf3ba 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/codec/ControlLoadCodec.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/codec/ControlLoadSnapshotCodec.java
@@ -18,22 +18,22 @@
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.onosproject.codec.CodecContext;
 import org.onosproject.codec.JsonCodec;
-import org.onosproject.cpman.ControlLoad;
+import org.onosproject.cpman.ControlLoadSnapshot;
 
 /**
- * Control load codec.
+ * Control load snapshot codec.
  */
-public final class ControlLoadCodec extends JsonCodec<ControlLoad> {
+public final class ControlLoadSnapshotCodec extends JsonCodec<ControlLoadSnapshot> {
 
     private static final String TIME = "time";
     private static final String LATEST = "latest";
     private static final String AVERAGE = "average";
 
     @Override
-    public ObjectNode encode(ControlLoad controlLoad, CodecContext context) {
+    public ObjectNode encode(ControlLoadSnapshot controlLoadSnapshot, CodecContext context) {
         return context.mapper().createObjectNode()
-                .put(TIME, controlLoad.time())
-                .put(LATEST, controlLoad.latest())
-                .put(AVERAGE, controlLoad.average());
+                .put(TIME, controlLoadSnapshot.time())
+                .put(LATEST, controlLoadSnapshot.latest())
+                .put(AVERAGE, controlLoadSnapshot.average());
     }
 }
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlMetricsRequest.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlMetricsRequest.java
deleted file mode 100644
index 1e87958..0000000
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlMetricsRequest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.cpman.impl;
-
-import com.google.common.base.MoreObjects;
-import org.onosproject.cpman.ControlMetricType;
-import org.onosproject.net.DeviceId;
-
-import java.util.Objects;
-import java.util.Optional;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-/**
- * A container class that is used to request control metric of remote node.
- */
-public class ControlMetricsRequest {
-    private final ControlMetricType type;
-    private Optional<DeviceId> deviceId;
-    private String resourceName;
-
-    /**
-     * Instantiates a new control metric request with the given control metric
-     * type and device identifier.
-     *
-     * @param type     control metric type
-     * @param deviceId device identifier
-     */
-    public ControlMetricsRequest(ControlMetricType type, Optional<DeviceId> deviceId) {
-        this.type = type;
-        this.deviceId = deviceId;
-    }
-
-    /**
-     * Instantiates a new control metric request with the given control metric
-     * type and resource name.
-     *
-     * @param type         control metric type
-     * @param resourceName resource name
-     */
-    public ControlMetricsRequest(ControlMetricType type, String resourceName) {
-        this.type = type;
-        this.resourceName = resourceName;
-    }
-
-    /**
-     * Obtains control metric type.
-     *
-     * @return control metric type
-     */
-    public ControlMetricType getType() {
-        return type;
-    }
-
-    /**
-     * Obtains resource name.
-     *
-     * @return resource name
-     */
-    public String getResourceName() {
-        return resourceName;
-    }
-
-    /**
-     * Obtains device identifier.
-     *
-     * @return device identifier
-     */
-    public Optional<DeviceId> getDeviceId() {
-        return deviceId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(type, deviceId, resourceName);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj instanceof ControlMetricsRequest) {
-            final ControlMetricsRequest other = (ControlMetricsRequest) obj;
-            return Objects.equals(this.type, other.type) &&
-                    Objects.equals(this.deviceId, other.deviceId) &&
-                    Objects.equals(this.resourceName, other.resourceName);
-        }
-        return false;
-    }
-
-    @Override
-    public String toString() {
-        MoreObjects.ToStringHelper helper;
-        helper = toStringHelper(this)
-                .add("type", type)
-                .add("resourceName", resourceName);
-        if (deviceId != null) {
-            helper.add("deviceId", deviceId.get());
-        }
-        return helper.toString();
-    }
-}
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlPlaneMonitor.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlPlaneMonitor.java
index 68981a6..d32d2e0 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlPlaneMonitor.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/impl/ControlPlaneMonitor.java
@@ -29,8 +29,10 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cpman.ControlLoad;
+import org.onosproject.cpman.ControlLoadSnapshot;
 import org.onosproject.cpman.ControlMetric;
 import org.onosproject.cpman.ControlMetricType;
+import org.onosproject.cpman.ControlMetricsRequest;
 import org.onosproject.cpman.ControlPlaneMonitorService;
 import org.onosproject.cpman.MetricsDatabase;
 import org.onosproject.net.DeviceId;
@@ -45,12 +47,10 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
 import static org.onosproject.cpman.ControlResource.CPU_METRICS;
 import static org.onosproject.cpman.ControlResource.DISK_METRICS;
@@ -84,7 +84,7 @@
             ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK);
 
     private static final MessageSubject CONTROL_STATS =
-                         new MessageSubject("control-plane-stats");
+            new MessageSubject("control-plane-stats");
 
     private Map<ControlMetricType, Double> cpuBuf;
     private Map<ControlMetricType, Double> memoryBuf;
@@ -95,19 +95,16 @@
     private Map<Type, Set<String>> availableResourceMap;
     private Set<DeviceId> availableDeviceIdSet;
 
-    private ExecutorService messageHandlingExecutor;
-
     private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
 
-    Set<Map<ControlMetricType, Double>> debugSets = Sets.newHashSet();
-
     private static final Serializer SERIALIZER = Serializer
             .using(new KryoNamespace.Builder()
                     .register(KryoNamespaces.API)
+                    .register(KryoNamespaces.BASIC)
                     .register(ControlMetricsRequest.class)
-                    .register(DefaultControlLoad.class)
-                    .register(DefaultMetricsDatabase.class)
+                    .register(ControlLoadSnapshot.class)
                     .register(ControlMetricType.class)
+                    .register(TimeUnit.class)
                     .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
 
     @Activate
@@ -127,11 +124,8 @@
         availableResourceMap = Maps.newConcurrentMap();
         availableDeviceIdSet = Sets.newConcurrentHashSet();
 
-        messageHandlingExecutor = Executors.newSingleThreadScheduledExecutor(
-                groupedThreads("onos/app/cpman", "message-handlers", log));
-
-        communicationService.addSubscriber(CONTROL_STATS,
-                SERIALIZER::decode, this::handleRequest, messageHandlingExecutor);
+        communicationService.<ControlMetricsRequest, ControlLoadSnapshot>addSubscriber(CONTROL_STATS,
+                SERIALIZER::decode, this::handleRequest, SERIALIZER::encode);
 
         log.info("Started");
     }
@@ -243,9 +237,263 @@
     }
 
     @Override
-    public ControlLoad getLocalLoad(ControlMetricType type,
-                                    Optional<DeviceId> deviceId) {
+    public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
+                                                          ControlMetricType type,
+                                                          Optional<DeviceId> deviceId) {
+        if (clusterService.getLocalNode().id().equals(nodeId)) {
+            return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId)));
+        } else {
+            return communicationService.sendAndReceive(createRequest(type, deviceId),
+                    CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
+        }
+    }
+
+    @Override
+    public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
+                                                          ControlMetricType type,
+                                                          String resourceName) {
+        if (clusterService.getLocalNode().id().equals(nodeId)) {
+            return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName)));
+        } else {
+            return communicationService.sendAndReceive(createRequest(type, resourceName),
+                    CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
+        }
+    }
+
+    @Override
+    public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
+                                                          ControlMetricType type,
+                                                          int duration, TimeUnit unit,
+                                                          Optional<DeviceId> deviceId) {
+        if (clusterService.getLocalNode().id().equals(nodeId)) {
+            return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId), duration, unit));
+        } else {
+            return communicationService.sendAndReceive(createRequest(type, duration, unit, deviceId),
+                    CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
+        }
+    }
+
+    @Override
+    public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
+                                                          ControlMetricType type,
+                                                          int duration, TimeUnit unit,
+                                                          String resourceName) {
+        if (clusterService.getLocalNode().id().equals(nodeId)) {
+            return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName), duration, unit));
+        } else {
+            return communicationService.sendAndReceive(createRequest(type, duration, unit, resourceName),
+                    CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
+        }
+    }
+
+    @Override
+    public Set<String> availableResources(Type resourceType) {
+        if (RESOURCE_TYPE_SET.contains(resourceType)) {
+            if (Type.CONTROL_MESSAGE.equals(resourceType)) {
+                return availableDeviceIdSet.stream().map(id ->
+                        id.toString()).collect(Collectors.toSet());
+            } else {
+                Set<String> res = availableResourceMap.get(resourceType);
+                return res == null ? ImmutableSet.of() : res;
+            }
+        }
+        return ImmutableSet.of();
+    }
+
+    /**
+     * Builds and returns metric database instance with given resource name,
+     * resource type and metric type.
+     *
+     * @param resourceName resource name
+     * @param resourceType resource type
+     * @param metricTypes  metric type
+     * @return metric database instance
+     */
+    private MetricsDatabase genMDbBuilder(String resourceName,
+                                          Type resourceType,
+                                          Set<ControlMetricType> metricTypes) {
+        MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder();
+        builder.withMetricName(resourceType.toString());
+        builder.withResourceName(resourceName);
+        metricTypes.forEach(type -> builder.addMetricType(type.toString()));
+        return builder.build();
+    }
+
+    /**
+     * Updates network metrics with given metric map and resource name.
+     *
+     * @param metricMap    a metric map which is comprised of metric type and value
+     * @param resourceName resource name
+     */
+    private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap,
+                                      String resourceName) {
+        networkMetricsMap.putIfAbsent(resourceName, genMDbBuilder(resourceName,
+                Type.NETWORK, NETWORK_METRICS));
+        networkMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
+    }
+
+    /**
+     * Updates disk metrics with given metric map and resource name.
+     *
+     * @param metricMap    a metric map which is comprised of metric type and value
+     * @param resourceName resource name
+     */
+    private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap,
+                                   String resourceName) {
+        diskMetricsMap.putIfAbsent(resourceName, genMDbBuilder(resourceName,
+                Type.DISK, DISK_METRICS));
+        diskMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
+    }
+
+    /**
+     * Updates control message metrics with given metric map and device identifier.
+     *
+     * @param metricMap a metric map which is comprised of metric type and value
+     * @param deviceId  device identifier
+     */
+    private void updateControlMessages(Map<ControlMetricType, Double> metricMap,
+                                       DeviceId deviceId) {
+        controlMessageMap.putIfAbsent(deviceId, genMDbBuilder(deviceId.toString(),
+                Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS));
+        controlMessageMap.get(deviceId).updateMetrics(convertMap(metricMap));
+    }
+
+    /**
+     * Converts metric map into a new map which contains string formatted metric type as key.
+     *
+     * @param metricMap metric map in which ControlMetricType is key
+     * @return a new map in which string formatted metric type is key
+     */
+    private Map<String, Double> convertMap(Map<ControlMetricType, Double> metricMap) {
+        if (metricMap == null) {
+            return ImmutableMap.of();
+        }
+        Map newMap = Maps.newConcurrentMap();
+        metricMap.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
+        return newMap;
+    }
+
+    /**
+     * Handles control metric request from remote node.
+     *
+     * @param request control metric request
+     * @return completable future object of control load snapshot
+     */
+    private CompletableFuture<ControlLoadSnapshot> handleRequest(ControlMetricsRequest request) {
+
+        checkArgument(request.getType() != null, METRIC_TYPE_NULL);
+
+        ControlLoad load;
+        if (request.getResourceName() != null && request.getUnit() != null) {
+            load = getLocalLoad(request.getType(), request.getResourceName());
+        } else {
+            load = getLocalLoad(request.getType(), request.getDeviceId());
+        }
+
+        long average;
+        if (request.getUnit() != null) {
+            average = load.average(request.getDuration(), request.getUnit());
+        } else {
+            average = load.average();
+        }
+        ControlLoadSnapshot resp =
+                new ControlLoadSnapshot(load.latest(), average, load.time());
+        return CompletableFuture.completedFuture(resp);
+    }
+
+    /**
+     * Generates a control metric request.
+     *
+     * @param type     control metric type
+     * @param deviceId device identifier
+     * @return control metric request instance
+     */
+    private ControlMetricsRequest createRequest(ControlMetricType type,
+                                                Optional<DeviceId> deviceId) {
+        return new ControlMetricsRequest(type, deviceId);
+    }
+
+    /**
+     * Generates a control metric request with given projected time range.
+     *
+     * @param type     control metric type
+     * @param duration projected time duration
+     * @param unit     projected time unit
+     * @param deviceId device identifier
+     * @return control metric request instance
+     */
+    private ControlMetricsRequest createRequest(ControlMetricType type,
+                                                int duration, TimeUnit unit,
+                                                Optional<DeviceId> deviceId) {
+        return new ControlMetricsRequest(type, duration, unit, deviceId);
+    }
+
+    /**
+     * Generates a control metric request.
+     *
+     * @param type         control metric type
+     * @param resourceName resource name
+     * @return control metric request instance
+     */
+    private ControlMetricsRequest createRequest(ControlMetricType type,
+                                                String resourceName) {
+        return new ControlMetricsRequest(type, resourceName);
+    }
+
+    /**
+     * Generates a control metric request with given projected time range.
+     *
+     * @param type         control metric type
+     * @param duration     projected time duration
+     * @param unit         projected time unit
+     * @param resourceName resource name
+     * @return control metric request instance
+     */
+    private ControlMetricsRequest createRequest(ControlMetricType type,
+                                                int duration, TimeUnit unit,
+                                                String resourceName) {
+        return new ControlMetricsRequest(type, duration, unit, resourceName);
+    }
+
+    /**
+     * Returns a snapshot of control load.
+     *
+     * @param cl control load
+     * @return a snapshot of control load
+     */
+    private ControlLoadSnapshot snapshot(ControlLoad cl) {
+        if (cl != null) {
+            return new ControlLoadSnapshot(cl.latest(), cl.average(), cl.time());
+        }
+        return null;
+    }
+
+    /**
+     * Returns a snapshot of control load with given projected time range.
+     *
+     * @param cl       control load
+     * @param duration projected time duration
+     * @param unit     projected time unit
+     * @return a snapshot of control load
+     */
+    private ControlLoadSnapshot snapshot(ControlLoad cl, int duration, TimeUnit unit) {
+        if (cl != null) {
+            return new ControlLoadSnapshot(cl.latest(), cl.average(duration, unit), cl.time());
+        }
+        return null;
+    }
+
+    /**
+     * Returns local control load.
+     *
+     * @param type     metric type
+     * @param deviceId device identifier
+     * @return control load
+     */
+    private ControlLoad getLocalLoad(ControlMetricType type,
+                                     Optional<DeviceId> deviceId) {
         if (deviceId.isPresent()) {
+            // returns control message stats
             if (CONTROL_MESSAGE_METRICS.contains(type) &&
                     availableDeviceIdSet.contains(deviceId.get())) {
                 return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
@@ -264,111 +512,25 @@
         return null;
     }
 
-    @Override
-    public ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
+    /**
+     * Returns local control load.
+     *
+     * @param type         metric type
+     * @param resourceName resource name
+     * @return control load
+     */
+    private ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
+        // returns disk I/O stats
         if (DISK_METRICS.contains(type) &&
                 availableResources(Type.DISK).contains(resourceName)) {
             return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
         }
 
+        // returns network I/O stats
         if (NETWORK_METRICS.contains(type) &&
                 availableResources(Type.NETWORK).contains(resourceName)) {
             return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
         }
         return null;
     }
-
-    @Override
-    public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
-                                                        ControlMetricType type,
-                                                        Optional<DeviceId> deviceId) {
-        return communicationService.sendAndReceive(createRequest(type, deviceId),
-                CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
-    }
-
-    @Override
-    public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
-                                                        ControlMetricType type,
-                                                        String resourceName) {
-        return communicationService.sendAndReceive(createRequest(type, resourceName),
-                CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
-    }
-
-
-    @Override
-    public Set<String> availableResources(Type resourceType) {
-        if (RESOURCE_TYPE_SET.contains(resourceType)) {
-            if (Type.CONTROL_MESSAGE.equals(resourceType)) {
-                return availableDeviceIdSet.stream().map(id ->
-                        id.toString()).collect(Collectors.toSet());
-            } else {
-                Set<String> res = availableResourceMap.get(resourceType);
-                return res == null ? ImmutableSet.of() : res;
-            }
-        }
-        return ImmutableSet.of();
-    }
-
-    private MetricsDatabase genMDbBuilder(String resourceName,
-                                          Type resourceType,
-                                          Set<ControlMetricType> metricTypes) {
-        MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder();
-        builder.withMetricName(resourceType.toString());
-        builder.withResourceName(resourceName);
-        metricTypes.forEach(type -> builder.addMetricType(type.toString()));
-        return builder.build();
-    }
-
-    private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap,
-                                      String resName) {
-        networkMetricsMap.putIfAbsent(resName, genMDbBuilder(resName,
-                Type.NETWORK, NETWORK_METRICS));
-        networkMetricsMap.get(resName).updateMetrics(convertMap(metricMap));
-    }
-
-    private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap,
-                                   String resName) {
-        diskMetricsMap.putIfAbsent(resName, genMDbBuilder(resName,
-                Type.DISK, DISK_METRICS));
-        diskMetricsMap.get(resName).updateMetrics(convertMap(metricMap));
-    }
-
-    private void updateControlMessages(Map<ControlMetricType, Double> metricMap,
-                                       DeviceId devId) {
-        controlMessageMap.putIfAbsent(devId, genMDbBuilder(devId.toString(),
-                Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS));
-        controlMessageMap.get(devId).updateMetrics(convertMap(metricMap));
-    }
-
-    private Map convertMap(Map<ControlMetricType, Double> map) {
-        if (map == null) {
-            return ImmutableMap.of();
-        }
-        Map newMap = Maps.newConcurrentMap();
-        map.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
-        return newMap;
-    }
-
-    private CompletableFuture<ControlLoad> handleRequest(ControlMetricsRequest request) {
-
-        checkArgument(request.getType() != null, METRIC_TYPE_NULL);
-
-        ControlLoad load;
-        if (request.getResourceName() != null) {
-            load = getLocalLoad(request.getType(), request.getResourceName());
-        } else {
-            load = getLocalLoad(request.getType(), request.getDeviceId());
-        }
-        return CompletableFuture.completedFuture(load);
-    }
-
-    private ControlMetricsRequest createRequest(ControlMetricType type,
-                                                Optional<DeviceId> deviceId) {
-        return new ControlMetricsRequest(type, deviceId);
-    }
-
-    private ControlMetricsRequest createRequest(ControlMetricType type,
-                                                String resourceName) {
-        return new ControlMetricsRequest(type, resourceName);
-    }
 }
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/CPManCodecRegistrator.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/CPManCodecRegistrator.java
index aa7709c..5afad2e 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/CPManCodecRegistrator.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/CPManCodecRegistrator.java
@@ -21,8 +21,8 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onosproject.codec.CodecService;
-import org.onosproject.cpman.ControlLoad;
-import org.onosproject.cpman.codec.ControlLoadCodec;
+import org.onosproject.cpman.ControlLoadSnapshot;
+import org.onosproject.cpman.codec.ControlLoadSnapshotCodec;
 import org.slf4j.Logger;
 
 import static org.slf4j.LoggerFactory.getLogger;
@@ -40,14 +40,14 @@
 
     @Activate
     public void activate() {
-        codecService.registerCodec(ControlLoad.class, new ControlLoadCodec());
+        codecService.registerCodec(ControlLoadSnapshot.class, new ControlLoadSnapshotCodec());
 
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        codecService.unregisterCodec(ControlLoad.class);
+        codecService.unregisterCodec(ControlLoadSnapshot.class);
 
         log.info("Stopped");
     }
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/ControlMetricsWebResource.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/ControlMetricsWebResource.java
index a5d0baf..5e985b0 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/ControlMetricsWebResource.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/ControlMetricsWebResource.java
@@ -18,9 +18,10 @@
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.commons.lang3.StringUtils;
+import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cpman.ControlLoad;
+import org.onosproject.cpman.ControlLoadSnapshot;
 import org.onosproject.cpman.ControlMetricType;
 import org.onosproject.cpman.ControlPlaneMonitorService;
 import org.onosproject.net.DeviceId;
@@ -34,6 +35,8 @@
 import javax.ws.rs.core.Response;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
 import static org.onosproject.cpman.ControlResource.CPU_METRICS;
@@ -55,6 +58,7 @@
     private final ClusterService clusterService = get(ClusterService.class);
     private final NodeId localNodeId = clusterService.getLocalNode().id();
     private final ObjectNode root = mapper().createObjectNode();
+    private static final long TIMEOUT_MILLIS = 1000;
 
     /**
      * Returns control message metrics of all devices.
@@ -248,31 +252,51 @@
         if (name == null && did == null) {
             typeSet.forEach(type -> {
                 ObjectNode metricNode = mapper().createObjectNode();
-                ControlLoad load = service.getLocalLoad(type, Optional.ofNullable(null));
-                if (load != null) {
-                    metricNode.set(toCamelCase(type.toString(), true), codec(ControlLoad.class)
-                            .encode(service.getLocalLoad(type, Optional.ofNullable(null)), this));
-                    metricsNode.add(metricNode);
+                CompletableFuture<ControlLoadSnapshot> cf =
+                        service.getLoad(nodeId, type, Optional.empty());
+
+                if (cf != null) {
+                    ControlLoadSnapshot cmr =
+                            Tools.futureGetOrElse(cf, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, null);
+
+                    if (cmr != null) {
+                        metricNode.set(toCamelCase(type.toString(), true),
+                                codec(ControlLoadSnapshot.class).encode(cmr, this));
+                        metricsNode.add(metricNode);
+                    }
                 }
             });
         } else if (name == null) {
             typeSet.forEach(type -> {
                 ObjectNode metricNode = mapper().createObjectNode();
-                ControlLoad load = service.getLocalLoad(type, Optional.of(did));
-                if (load != null) {
-                    metricNode.set(toCamelCase(type.toString(), true),
-                            codec(ControlLoad.class).encode(load, this));
-                    metricsNode.add(metricNode);
+                CompletableFuture<ControlLoadSnapshot> cf =
+                        service.getLoad(nodeId, type, Optional.of(did));
+
+                if (cf != null) {
+                    ControlLoadSnapshot cmr =
+                            Tools.futureGetOrElse(cf, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, null);
+                    if (cmr != null) {
+                        metricNode.set(toCamelCase(type.toString(), true),
+                                codec(ControlLoadSnapshot.class).encode(cmr, this));
+                        metricsNode.add(metricNode);
+                    }
                 }
+
             });
         } else if (did == null) {
             typeSet.forEach(type -> {
                 ObjectNode metricNode = mapper().createObjectNode();
-                ControlLoad load = service.getLocalLoad(type, name);
-                if (load != null) {
-                    metricNode.set(toCamelCase(type.toString(), true),
-                            codec(ControlLoad.class).encode(load, this));
-                    metricsNode.add(metricNode);
+                CompletableFuture<ControlLoadSnapshot> cf =
+                        service.getLoad(nodeId, type, name);
+
+                if (cf != null) {
+                    ControlLoadSnapshot cmr =
+                            Tools.futureGetOrElse(cf, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, null);
+                    if (cmr != null) {
+                        metricNode.set(toCamelCase(type.toString(), true),
+                                codec(ControlLoadSnapshot.class).encode(cmr, this));
+                        metricsNode.add(metricNode);
+                    }
                 }
             });
         }
diff --git a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
index c96d1d9..5dd9413 100644
--- a/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
+++ b/apps/cpman/app/src/main/java/org/onosproject/cpman/rest/SystemMetricsCollectorWebResource.java
@@ -106,31 +106,31 @@
             long cpuIdleTime = nullIsIllegal(jsonTree.get("cpuIdleTime").asLong(), INVALID_REQUEST);
 
             aggregator.setMetricsService(metricsService);
-            aggregator.addMetrics(Optional.ofNullable(null), SYSTEM_TYPE);
+            aggregator.addMetrics(Optional.empty(), SYSTEM_TYPE);
 
             cm = new ControlMetric(ControlMetricType.CPU_LOAD,
                     new MetricValue.Builder().load(cpuLoad).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.CPU_LOAD, cpuLoad);
 
             cm = new ControlMetric(ControlMetricType.TOTAL_CPU_TIME,
                     new MetricValue.Builder().load(totalCpuTime).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.TOTAL_CPU_TIME, totalCpuTime);
 
             cm = new ControlMetric(ControlMetricType.SYS_CPU_TIME,
                     new MetricValue.Builder().load(sysCpuTime).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.SYS_CPU_TIME, sysCpuTime);
 
             cm = new ControlMetric(ControlMetricType.USER_CPU_TIME,
                     new MetricValue.Builder().load(userCpuTime).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.USER_CPU_TIME, userCpuTime);
 
             cm = new ControlMetric(ControlMetricType.CPU_IDLE_TIME,
                     new MetricValue.Builder().load(cpuIdleTime).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.CPU_IDLE_TIME, cpuIdleTime);
 
         } catch (IOException e) {
@@ -167,26 +167,26 @@
             long memFreeRatio = memTotal == 0L ? 0L : (memFree * PERCENT_CONSTANT) / memTotal;
 
             aggregator.setMetricsService(metricsService);
-            aggregator.addMetrics(Optional.ofNullable(null), SYSTEM_TYPE);
+            aggregator.addMetrics(Optional.empty(), SYSTEM_TYPE);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_USED_RATIO,
                     new MetricValue.Builder().load(memUsedRatio).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.MEMORY_USED_RATIO, memUsedRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE_RATIO,
                     new MetricValue.Builder().load(memFreeRatio).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.MEMORY_FREE_RATIO, memFreeRatio);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_USED,
                     new MetricValue.Builder().load(memUsed).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.MEMORY_USED, memUsed);
 
             cm = new ControlMetric(ControlMetricType.MEMORY_FREE,
                     new MetricValue.Builder().load(memFree).add());
-            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.ofNullable(null));
+            monitorService.updateMetric(cm, UPDATE_INTERVAL_IN_MINUTE, Optional.empty());
             aggregator.increment(ControlMetricType.MEMORY_FREE, memFree);
 
         } catch (IOException e) {