blob: 11898994ddb8d6756ce867f707dc8fd850fa25ea [file] [log] [blame]
Jian Li60804322015-12-02 14:46:31 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jian Li60804322015-12-02 14:46:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jian Li6b86a762016-01-29 09:30:40 -080016package org.onosproject.cpman.impl;
Jian Li60804322015-12-02 14:46:31 -080017
Jian Li9f3a8852016-04-07 13:37:39 -070018import com.google.common.collect.ImmutableMap;
Jian Li7d180c52016-02-01 21:53:08 -080019import com.google.common.collect.ImmutableSet;
Jian Li85060ac2016-02-04 09:58:56 -080020import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Jian Li60804322015-12-02 14:46:31 -080022import org.apache.felix.scr.annotations.Activate;
Jian Lic132c112016-01-28 20:27:34 -080023import org.apache.felix.scr.annotations.Component;
Jian Li60804322015-12-02 14:46:31 -080024import org.apache.felix.scr.annotations.Deactivate;
Jian Li7d180c52016-02-01 21:53:08 -080025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Li60804322015-12-02 14:46:31 -080027import org.apache.felix.scr.annotations.Service;
Jian Li23906cc2016-03-31 11:16:44 -070028import org.onlab.util.KryoNamespace;
Jian Li7d180c52016-02-01 21:53:08 -080029import org.onosproject.cluster.ClusterService;
Jian Li60804322015-12-02 14:46:31 -080030import org.onosproject.cluster.NodeId;
Jian Li6b86a762016-01-29 09:30:40 -080031import org.onosproject.cpman.ControlLoad;
Jian Li67e1e152016-04-18 17:52:58 -070032import org.onosproject.cpman.ControlLoadSnapshot;
Jian Li6b86a762016-01-29 09:30:40 -080033import org.onosproject.cpman.ControlMetric;
34import org.onosproject.cpman.ControlMetricType;
Jian Li67e1e152016-04-18 17:52:58 -070035import org.onosproject.cpman.ControlMetricsRequest;
Jian Li6b86a762016-01-29 09:30:40 -080036import org.onosproject.cpman.ControlPlaneMonitorService;
Jian Li89eeccd2016-05-06 02:10:33 -070037import org.onosproject.cpman.ControlResource;
38import org.onosproject.cpman.ControlResourceRequest;
Jian Li7d180c52016-02-01 21:53:08 -080039import org.onosproject.cpman.MetricsDatabase;
Jian Li60804322015-12-02 14:46:31 -080040import org.onosproject.net.DeviceId;
Jian Li23906cc2016-03-31 11:16:44 -070041import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.MessageSubject;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.Serializer;
Jian Li60804322015-12-02 14:46:31 -080045import org.slf4j.Logger;
Jian Li7d180c52016-02-01 21:53:08 -080046import org.slf4j.LoggerFactory;
Jian Li60804322015-12-02 14:46:31 -080047
Jian Li7d180c52016-02-01 21:53:08 -080048import java.util.Map;
Jian Li60804322015-12-02 14:46:31 -080049import java.util.Optional;
Jian Lic5cb4a12016-02-03 23:24:42 -080050import java.util.Set;
Jian Li23906cc2016-03-31 11:16:44 -070051import java.util.concurrent.CompletableFuture;
Jian Li67e1e152016-04-18 17:52:58 -070052import java.util.concurrent.TimeUnit;
Jian Li85060ac2016-02-04 09:58:56 -080053import java.util.stream.Collectors;
Jian Li60804322015-12-02 14:46:31 -080054
Jian Li23906cc2016-03-31 11:16:44 -070055import static com.google.common.base.Preconditions.checkArgument;
Jian Li23906cc2016-03-31 11:16:44 -070056import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
57import static org.onosproject.cpman.ControlResource.CPU_METRICS;
58import static org.onosproject.cpman.ControlResource.DISK_METRICS;
59import static org.onosproject.cpman.ControlResource.MEMORY_METRICS;
60import static org.onosproject.cpman.ControlResource.NETWORK_METRICS;
61import static org.onosproject.cpman.ControlResource.Type;
Jian Li60804322015-12-02 14:46:31 -080062
63/**
64 * Control plane monitoring service class.
65 */
66@Component(immediate = true)
67@Service
68public class ControlPlaneMonitor implements ControlPlaneMonitorService {
69
Jian Li7d180c52016-02-01 21:53:08 -080070 private final Logger log = LoggerFactory.getLogger(getClass());
71 private MetricsDatabase cpuMetrics;
72 private MetricsDatabase memoryMetrics;
73 private Map<DeviceId, MetricsDatabase> controlMessageMap;
74 private Map<String, MetricsDatabase> diskMetricsMap;
75 private Map<String, MetricsDatabase> networkMetricsMap;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterService clusterService;
79
Jian Li23906cc2016-03-31 11:16:44 -070080 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected ClusterCommunicationService communicationService;
82
Jian Lidaf55ea2016-04-04 20:38:30 -070083 private static final String DEFAULT_RESOURCE = "default";
84
Jian Li85060ac2016-02-04 09:58:56 -080085 private static final Set RESOURCE_TYPE_SET =
86 ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK);
Jian Li7d180c52016-02-01 21:53:08 -080087
Jian Li23906cc2016-03-31 11:16:44 -070088 private static final MessageSubject CONTROL_STATS =
Jian Li67e1e152016-04-18 17:52:58 -070089 new MessageSubject("control-plane-stats");
Jian Li23906cc2016-03-31 11:16:44 -070090
Jian Li89eeccd2016-05-06 02:10:33 -070091 private static final MessageSubject CONTROL_RESOURCE =
92 new MessageSubject("control-plane-resources");
93
Jian Li7d180c52016-02-01 21:53:08 -080094 private Map<ControlMetricType, Double> cpuBuf;
95 private Map<ControlMetricType, Double> memoryBuf;
96 private Map<String, Map<ControlMetricType, Double>> diskBuf;
97 private Map<String, Map<ControlMetricType, Double>> networkBuf;
98 private Map<DeviceId, Map<ControlMetricType, Double>> ctrlMsgBuf;
Jian Li60804322015-12-02 14:46:31 -080099
Jian Li85060ac2016-02-04 09:58:56 -0800100 private Map<Type, Set<String>> availableResourceMap;
101 private Set<DeviceId> availableDeviceIdSet;
102
Jian Li23906cc2016-03-31 11:16:44 -0700103 private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
Jian Li89eeccd2016-05-06 02:10:33 -0700104 private static final String RESOURCE_TYPE_NULL = "Control resource type cannot be null";
Jian Li23906cc2016-03-31 11:16:44 -0700105
106 private static final Serializer SERIALIZER = Serializer
107 .using(new KryoNamespace.Builder()
108 .register(KryoNamespaces.API)
109 .register(ControlMetricsRequest.class)
Jian Li89eeccd2016-05-06 02:10:33 -0700110 .register(ControlResourceRequest.class)
Jian Li67e1e152016-04-18 17:52:58 -0700111 .register(ControlLoadSnapshot.class)
Jian Li23906cc2016-03-31 11:16:44 -0700112 .register(ControlMetricType.class)
Jian Li89eeccd2016-05-06 02:10:33 -0700113 .register(ControlResource.Type.class)
Jian Li67e1e152016-04-18 17:52:58 -0700114 .register(TimeUnit.class)
Jian Li23906cc2016-03-31 11:16:44 -0700115 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
116
Jian Li60804322015-12-02 14:46:31 -0800117 @Activate
118 public void activate() {
Jian Lidaf55ea2016-04-04 20:38:30 -0700119 cpuMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.CPU, CPU_METRICS);
120 memoryMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.MEMORY, MEMORY_METRICS);
Jian Li85060ac2016-02-04 09:58:56 -0800121 controlMessageMap = Maps.newConcurrentMap();
122 diskMetricsMap = Maps.newConcurrentMap();
123 networkMetricsMap = Maps.newConcurrentMap();
Jian Li7d180c52016-02-01 21:53:08 -0800124
Jian Li85060ac2016-02-04 09:58:56 -0800125 cpuBuf = Maps.newConcurrentMap();
126 memoryBuf = Maps.newConcurrentMap();
127 diskBuf = Maps.newConcurrentMap();
128 networkBuf = Maps.newConcurrentMap();
129 ctrlMsgBuf = Maps.newConcurrentMap();
130
131 availableResourceMap = Maps.newConcurrentMap();
132 availableDeviceIdSet = Sets.newConcurrentHashSet();
Jian Li7d180c52016-02-01 21:53:08 -0800133
Jian Li67e1e152016-04-18 17:52:58 -0700134 communicationService.<ControlMetricsRequest, ControlLoadSnapshot>addSubscriber(CONTROL_STATS,
Jian Li89eeccd2016-05-06 02:10:33 -0700135 SERIALIZER::decode, this::handleMetricsRequest, SERIALIZER::encode);
136
137 communicationService.<ControlResourceRequest, Set<String>>addSubscriber(CONTROL_RESOURCE,
138 SERIALIZER::decode, this::handleResourceRequest, SERIALIZER::encode);
Jian Li23906cc2016-03-31 11:16:44 -0700139
Jian Li7d180c52016-02-01 21:53:08 -0800140 log.info("Started");
Jian Li60804322015-12-02 14:46:31 -0800141 }
142
143 @Deactivate
144 public void deactivate() {
Jian Li7d180c52016-02-01 21:53:08 -0800145
146 // TODO: need to handle the mdb close.
147 cpuBuf.clear();
148 memoryBuf.clear();
149 diskBuf.clear();
150 networkBuf.clear();
151 ctrlMsgBuf.clear();
152
Jian Li23906cc2016-03-31 11:16:44 -0700153 communicationService.removeSubscriber(CONTROL_STATS);
Jian Li89eeccd2016-05-06 02:10:33 -0700154 communicationService.removeSubscriber(CONTROL_RESOURCE);
Jian Li23906cc2016-03-31 11:16:44 -0700155
Jian Li7d180c52016-02-01 21:53:08 -0800156 log.info("Stopped");
Jian Li60804322015-12-02 14:46:31 -0800157 }
158
Jian Li60804322015-12-02 14:46:31 -0800159 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800160 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Li60804322015-12-02 14:46:31 -0800161 Optional<DeviceId> deviceId) {
Jian Li7d180c52016-02-01 21:53:08 -0800162 if (deviceId.isPresent()) {
Jian Li46148902016-01-29 13:33:50 -0800163
Jian Li7d180c52016-02-01 21:53:08 -0800164 // insert a new device entry if we cannot find any
Jian Li85060ac2016-02-04 09:58:56 -0800165 ctrlMsgBuf.putIfAbsent(deviceId.get(), Maps.newConcurrentMap());
Jian Li7d180c52016-02-01 21:53:08 -0800166
167 // update control message metrics
Jian Li85060ac2016-02-04 09:58:56 -0800168 if (CONTROL_MESSAGE_METRICS.contains(cm.metricType())) {
169
170 if (!availableDeviceIdSet.contains(deviceId.get())) {
171 availableDeviceIdSet.add(deviceId.get());
172 }
Jian Li7d180c52016-02-01 21:53:08 -0800173
174 // we will accumulate the metric value into buffer first
175 ctrlMsgBuf.get(deviceId.get()).putIfAbsent(cm.metricType(),
176 (double) cm.metricValue().getLoad());
177
178 // if buffer contains all control message metrics,
179 // we simply set and update the values into MetricsDatabase.
Jian Li85060ac2016-02-04 09:58:56 -0800180 if (ctrlMsgBuf.get(deviceId.get()).keySet()
181 .containsAll(CONTROL_MESSAGE_METRICS)) {
Jian Li7d180c52016-02-01 21:53:08 -0800182 updateControlMessages(ctrlMsgBuf.get(deviceId.get()), deviceId.get());
Jian Li1aa07822016-04-19 17:58:02 -0700183 ctrlMsgBuf.clear();
Jian Li7d180c52016-02-01 21:53:08 -0800184 }
185 }
186 } else {
187
188 // update cpu metrics
189 if (CPU_METRICS.contains(cm.metricType())) {
190 cpuBuf.putIfAbsent(cm.metricType(),
191 (double) cm.metricValue().getLoad());
192 if (cpuBuf.keySet().containsAll(CPU_METRICS)) {
193 cpuMetrics.updateMetrics(convertMap(cpuBuf));
194 cpuBuf.clear();
195 }
196 }
197
198 // update memory metrics
199 if (MEMORY_METRICS.contains(cm.metricType())) {
200 memoryBuf.putIfAbsent(cm.metricType(),
201 (double) cm.metricValue().getLoad());
202 if (memoryBuf.keySet().containsAll(MEMORY_METRICS)) {
203 memoryMetrics.updateMetrics(convertMap(memoryBuf));
204 memoryBuf.clear();
205 }
206 }
207 }
Jian Li60804322015-12-02 14:46:31 -0800208 }
209
210 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800211 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Lie044d1a2016-01-25 09:01:20 -0800212 String resourceName) {
Jian Li7d180c52016-02-01 21:53:08 -0800213 // update disk metrics
214 if (DISK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800215 diskBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
216
217 availableResourceMap.putIfAbsent(Type.DISK, Sets.newHashSet());
218 availableResourceMap.computeIfPresent(Type.DISK, (k, v) -> {
219 v.add(resourceName);
220 return v;
221 });
222
Jian Li7d180c52016-02-01 21:53:08 -0800223 diskBuf.get(resourceName).putIfAbsent(cm.metricType(),
224 (double) cm.metricValue().getLoad());
225 if (diskBuf.get(resourceName).keySet().containsAll(DISK_METRICS)) {
226 updateDiskMetrics(diskBuf.get(resourceName), resourceName);
227 diskBuf.clear();
228 }
229 }
Jian Lie044d1a2016-01-25 09:01:20 -0800230
Jian Li7d180c52016-02-01 21:53:08 -0800231 // update network metrics
232 if (NETWORK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800233 networkBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
234
235 availableResourceMap.putIfAbsent(Type.NETWORK, Sets.newHashSet());
236 availableResourceMap.computeIfPresent(Type.NETWORK, (k, v) -> {
237 v.add(resourceName);
238 return v;
239 });
240
Jian Li7d180c52016-02-01 21:53:08 -0800241 networkBuf.get(resourceName).putIfAbsent(cm.metricType(),
242 (double) cm.metricValue().getLoad());
243 if (networkBuf.get(resourceName).keySet().containsAll(NETWORK_METRICS)) {
244 updateNetworkMetrics(networkBuf.get(resourceName), resourceName);
245 networkBuf.clear();
246 }
247 }
Jian Lie044d1a2016-01-25 09:01:20 -0800248 }
249
250 @Override
Jian Li67e1e152016-04-18 17:52:58 -0700251 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
252 ControlMetricType type,
253 Optional<DeviceId> deviceId) {
254 if (clusterService.getLocalNode().id().equals(nodeId)) {
255 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId)));
256 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700257 return communicationService.sendAndReceive(createMetricsRequest(type, deviceId),
Jian Li67e1e152016-04-18 17:52:58 -0700258 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
259 }
260 }
261
262 @Override
263 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
264 ControlMetricType type,
265 String resourceName) {
266 if (clusterService.getLocalNode().id().equals(nodeId)) {
267 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName)));
268 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700269 return communicationService.sendAndReceive(createMetricsRequest(type, resourceName),
Jian Li67e1e152016-04-18 17:52:58 -0700270 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
271 }
272 }
273
274 @Override
275 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
276 ControlMetricType type,
277 int duration, TimeUnit unit,
278 Optional<DeviceId> deviceId) {
279 if (clusterService.getLocalNode().id().equals(nodeId)) {
280 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId), duration, unit));
281 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700282 return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, deviceId),
Jian Li67e1e152016-04-18 17:52:58 -0700283 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
284 }
285 }
286
287 @Override
288 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
289 ControlMetricType type,
290 int duration, TimeUnit unit,
291 String resourceName) {
292 if (clusterService.getLocalNode().id().equals(nodeId)) {
293 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName), duration, unit));
294 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700295 return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, resourceName),
Jian Li67e1e152016-04-18 17:52:58 -0700296 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
297 }
298 }
299
300 @Override
Jian Li89eeccd2016-05-06 02:10:33 -0700301 public CompletableFuture<Set<String>> availableResources(NodeId nodeId,
302 Type resourceType) {
303 if (clusterService.getLocalNode().id().equals(nodeId)) {
304 Set<String> resources = getLocalAvailableResources(resourceType);
305 return CompletableFuture.completedFuture(resources);
306 } else {
307 return communicationService.sendAndReceive(createResourceRequest(resourceType),
308 CONTROL_RESOURCE, SERIALIZER::encode, SERIALIZER::decode, nodeId);
Jian Li67e1e152016-04-18 17:52:58 -0700309 }
Jian Li67e1e152016-04-18 17:52:58 -0700310 }
311
312 /**
313 * Builds and returns metric database instance with given resource name,
314 * resource type and metric type.
315 *
316 * @param resourceName resource name
317 * @param resourceType resource type
318 * @param metricTypes metric type
319 * @return metric database instance
320 */
321 private MetricsDatabase genMDbBuilder(String resourceName,
322 Type resourceType,
323 Set<ControlMetricType> metricTypes) {
324 MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder();
325 builder.withMetricName(resourceType.toString());
326 builder.withResourceName(resourceName);
327 metricTypes.forEach(type -> builder.addMetricType(type.toString()));
328 return builder.build();
329 }
330
331 /**
332 * Updates network metrics with given metric map and resource name.
333 *
334 * @param metricMap a metric map which is comprised of metric type and value
335 * @param resourceName resource name
336 */
337 private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap,
338 String resourceName) {
Jian Li1aa07822016-04-19 17:58:02 -0700339 if (!networkMetricsMap.containsKey(resourceName)) {
340 networkMetricsMap.put(resourceName, genMDbBuilder(resourceName,
341 Type.NETWORK, NETWORK_METRICS));
342 }
Jian Li67e1e152016-04-18 17:52:58 -0700343 networkMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
344 }
345
346 /**
347 * Updates disk metrics with given metric map and resource name.
348 *
349 * @param metricMap a metric map which is comprised of metric type and value
350 * @param resourceName resource name
351 */
352 private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap,
353 String resourceName) {
Jian Li1aa07822016-04-19 17:58:02 -0700354 if (!diskMetricsMap.containsKey(resourceName)) {
355 diskMetricsMap.put(resourceName, genMDbBuilder(resourceName,
356 Type.DISK, DISK_METRICS));
357 }
Jian Li67e1e152016-04-18 17:52:58 -0700358 diskMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
359 }
360
361 /**
362 * Updates control message metrics with given metric map and device identifier.
363 *
364 * @param metricMap a metric map which is comprised of metric type and value
365 * @param deviceId device identifier
366 */
367 private void updateControlMessages(Map<ControlMetricType, Double> metricMap,
368 DeviceId deviceId) {
Jian Li1aa07822016-04-19 17:58:02 -0700369 if (!controlMessageMap.containsKey(deviceId)) {
370 controlMessageMap.put(deviceId, genMDbBuilder(deviceId.toString(),
371 Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS));
372 }
Jian Li67e1e152016-04-18 17:52:58 -0700373 controlMessageMap.get(deviceId).updateMetrics(convertMap(metricMap));
374 }
375
376 /**
377 * Converts metric map into a new map which contains string formatted metric type as key.
378 *
379 * @param metricMap metric map in which ControlMetricType is key
380 * @return a new map in which string formatted metric type is key
381 */
382 private Map<String, Double> convertMap(Map<ControlMetricType, Double> metricMap) {
383 if (metricMap == null) {
384 return ImmutableMap.of();
385 }
386 Map newMap = Maps.newConcurrentMap();
387 metricMap.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
388 return newMap;
389 }
390
391 /**
392 * Handles control metric request from remote node.
393 *
394 * @param request control metric request
395 * @return completable future object of control load snapshot
396 */
Jian Li89eeccd2016-05-06 02:10:33 -0700397 private CompletableFuture<ControlLoadSnapshot>
398 handleMetricsRequest(ControlMetricsRequest request) {
Jian Li67e1e152016-04-18 17:52:58 -0700399
400 checkArgument(request.getType() != null, METRIC_TYPE_NULL);
401
402 ControlLoad load;
403 if (request.getResourceName() != null && request.getUnit() != null) {
404 load = getLocalLoad(request.getType(), request.getResourceName());
405 } else {
406 load = getLocalLoad(request.getType(), request.getDeviceId());
407 }
408
409 long average;
410 if (request.getUnit() != null) {
411 average = load.average(request.getDuration(), request.getUnit());
412 } else {
413 average = load.average();
414 }
415 ControlLoadSnapshot resp =
416 new ControlLoadSnapshot(load.latest(), average, load.time());
417 return CompletableFuture.completedFuture(resp);
418 }
419
420 /**
Jian Li89eeccd2016-05-06 02:10:33 -0700421 * Handles control resource request from remote node.
422 *
423 * @param request control resource type
424 * @return completable future object of control resource set
425 */
426 private CompletableFuture<Set<String>>
427 handleResourceRequest(ControlResourceRequest request) {
428
429 checkArgument(request.getType() != null, RESOURCE_TYPE_NULL);
430
431 Set<String> resources = getLocalAvailableResources(request.getType());
432 return CompletableFuture.completedFuture(resources);
433 }
434
435 /**
Jian Li67e1e152016-04-18 17:52:58 -0700436 * Generates a control metric request.
437 *
438 * @param type control metric type
439 * @param deviceId device identifier
440 * @return control metric request instance
441 */
Jian Li89eeccd2016-05-06 02:10:33 -0700442 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
443 Optional<DeviceId> deviceId) {
Jian Li67e1e152016-04-18 17:52:58 -0700444 return new ControlMetricsRequest(type, deviceId);
445 }
446
447 /**
448 * Generates a control metric request with given projected time range.
449 *
450 * @param type control metric type
451 * @param duration projected time duration
452 * @param unit projected time unit
453 * @param deviceId device identifier
454 * @return control metric request instance
455 */
Jian Li89eeccd2016-05-06 02:10:33 -0700456 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
457 int duration, TimeUnit unit,
458 Optional<DeviceId> deviceId) {
Jian Li67e1e152016-04-18 17:52:58 -0700459 return new ControlMetricsRequest(type, duration, unit, deviceId);
460 }
461
462 /**
463 * Generates a control metric request.
464 *
465 * @param type control metric type
466 * @param resourceName resource name
467 * @return control metric request instance
468 */
Jian Li89eeccd2016-05-06 02:10:33 -0700469 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
470 String resourceName) {
Jian Li67e1e152016-04-18 17:52:58 -0700471 return new ControlMetricsRequest(type, resourceName);
472 }
473
474 /**
475 * Generates a control metric request with given projected time range.
476 *
477 * @param type control metric type
478 * @param duration projected time duration
479 * @param unit projected time unit
480 * @param resourceName resource name
481 * @return control metric request instance
482 */
Jian Li89eeccd2016-05-06 02:10:33 -0700483 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
484 int duration, TimeUnit unit,
485 String resourceName) {
Jian Li67e1e152016-04-18 17:52:58 -0700486 return new ControlMetricsRequest(type, duration, unit, resourceName);
487 }
488
489 /**
Jian Li89eeccd2016-05-06 02:10:33 -0700490 * Generates a control resource request with given resource type.
491 *
492 * @param type control resource type
493 * @return control resource request instance
494 */
495 private ControlResourceRequest createResourceRequest(ControlResource.Type type) {
496 return new ControlResourceRequest(type);
497 }
498
499 /**
Jian Li67e1e152016-04-18 17:52:58 -0700500 * Returns a snapshot of control load.
501 *
502 * @param cl control load
503 * @return a snapshot of control load
504 */
505 private ControlLoadSnapshot snapshot(ControlLoad cl) {
506 if (cl != null) {
507 return new ControlLoadSnapshot(cl.latest(), cl.average(), cl.time());
508 }
509 return null;
510 }
511
512 /**
513 * Returns a snapshot of control load with given projected time range.
514 *
515 * @param cl control load
516 * @param duration projected time duration
517 * @param unit projected time unit
518 * @return a snapshot of control load
519 */
520 private ControlLoadSnapshot snapshot(ControlLoad cl, int duration, TimeUnit unit) {
521 if (cl != null) {
Jian Li1aa07822016-04-19 17:58:02 -0700522
523 return new ControlLoadSnapshot(cl.latest(), cl.average(duration, unit),
524 cl.time(), cl.recent(duration, unit));
Jian Li67e1e152016-04-18 17:52:58 -0700525 }
526 return null;
527 }
528
529 /**
530 * Returns local control load.
531 *
532 * @param type metric type
533 * @param deviceId device identifier
534 * @return control load
535 */
536 private ControlLoad getLocalLoad(ControlMetricType type,
537 Optional<DeviceId> deviceId) {
Jian Li23906cc2016-03-31 11:16:44 -0700538 if (deviceId.isPresent()) {
Jian Li67e1e152016-04-18 17:52:58 -0700539 // returns control message stats
Jian Li23906cc2016-03-31 11:16:44 -0700540 if (CONTROL_MESSAGE_METRICS.contains(type) &&
541 availableDeviceIdSet.contains(deviceId.get())) {
542 return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
Jian Li7d180c52016-02-01 21:53:08 -0800543 }
544 } else {
Jian Li23906cc2016-03-31 11:16:44 -0700545 // returns controlLoad of CPU metrics
546 if (CPU_METRICS.contains(type)) {
547 return new DefaultControlLoad(cpuMetrics, type);
548 }
549
550 // returns memoryLoad of memory metrics
551 if (MEMORY_METRICS.contains(type)) {
552 return new DefaultControlLoad(memoryMetrics, type);
553 }
Jian Li7d180c52016-02-01 21:53:08 -0800554 }
Jian Li60804322015-12-02 14:46:31 -0800555 return null;
556 }
557
Jian Li67e1e152016-04-18 17:52:58 -0700558 /**
559 * Returns local control load.
560 *
561 * @param type metric type
562 * @param resourceName resource name
563 * @return control load
564 */
565 private ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
Jian Li89eeccd2016-05-06 02:10:33 -0700566 NodeId localNodeId = clusterService.getLocalNode().id();
567
Jian Li67e1e152016-04-18 17:52:58 -0700568 // returns disk I/O stats
Jian Li23906cc2016-03-31 11:16:44 -0700569 if (DISK_METRICS.contains(type) &&
Jian Li89eeccd2016-05-06 02:10:33 -0700570 availableResourcesSync(localNodeId, Type.DISK).contains(resourceName)) {
Jian Li23906cc2016-03-31 11:16:44 -0700571 return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
572 }
Jian Li7d180c52016-02-01 21:53:08 -0800573
Jian Li67e1e152016-04-18 17:52:58 -0700574 // returns network I/O stats
Jian Li23906cc2016-03-31 11:16:44 -0700575 if (NETWORK_METRICS.contains(type) &&
Jian Li89eeccd2016-05-06 02:10:33 -0700576 availableResourcesSync(localNodeId, Type.NETWORK).contains(resourceName)) {
Jian Li23906cc2016-03-31 11:16:44 -0700577 return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
Jian Li7d180c52016-02-01 21:53:08 -0800578 }
Jian Li60804322015-12-02 14:46:31 -0800579 return null;
580 }
Jian Li89eeccd2016-05-06 02:10:33 -0700581
582 /**
583 * Obtains the available resource list from local node.
584 *
585 * @param resourceType control resource type
586 * @return a set of available control resource
587 */
588 private Set<String> getLocalAvailableResources(Type resourceType) {
589 Set<String> resources = ImmutableSet.of();
590 if (RESOURCE_TYPE_SET.contains(resourceType)) {
591 if (Type.CONTROL_MESSAGE.equals(resourceType)) {
592 resources = ImmutableSet.copyOf(availableDeviceIdSet.stream()
593 .map(DeviceId::toString).collect(Collectors.toSet()));
594 } else {
595 Set<String> res = availableResourceMap.get(resourceType);
596 resources = res == null ? ImmutableSet.of() : res;
597 }
598 }
599 return resources;
600 }
Jian Li23906cc2016-03-31 11:16:44 -0700601}