blob: ea2dabcaaf6fa611997996031a18c34e0638a160 [file] [log] [blame]
Jian Li60804322015-12-02 14:46:31 -08001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Jian Li60804322015-12-02 14:46:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jian Li6b86a762016-01-29 09:30:40 -080016package org.onosproject.cpman.impl;
Jian Li60804322015-12-02 14:46:31 -080017
Jian Li9f3a8852016-04-07 13:37:39 -070018import com.google.common.collect.ImmutableMap;
Jian Li7d180c52016-02-01 21:53:08 -080019import com.google.common.collect.ImmutableSet;
Jian Li85060ac2016-02-04 09:58:56 -080020import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Jian Li23906cc2016-03-31 11:16:44 -070022import org.onlab.util.KryoNamespace;
Jian Li7d180c52016-02-01 21:53:08 -080023import org.onosproject.cluster.ClusterService;
Jian Li60804322015-12-02 14:46:31 -080024import org.onosproject.cluster.NodeId;
Jian Li6b86a762016-01-29 09:30:40 -080025import org.onosproject.cpman.ControlLoad;
Jian Li67e1e152016-04-18 17:52:58 -070026import org.onosproject.cpman.ControlLoadSnapshot;
Jian Li6b86a762016-01-29 09:30:40 -080027import org.onosproject.cpman.ControlMetric;
28import org.onosproject.cpman.ControlMetricType;
Jian Li67e1e152016-04-18 17:52:58 -070029import org.onosproject.cpman.ControlMetricsRequest;
Jian Li6b86a762016-01-29 09:30:40 -080030import org.onosproject.cpman.ControlPlaneMonitorService;
Jian Li89eeccd2016-05-06 02:10:33 -070031import org.onosproject.cpman.ControlResource;
32import org.onosproject.cpman.ControlResourceRequest;
Jian Li7d180c52016-02-01 21:53:08 -080033import org.onosproject.cpman.MetricsDatabase;
Jian Li60804322015-12-02 14:46:31 -080034import org.onosproject.net.DeviceId;
Jian Li23906cc2016-03-31 11:16:44 -070035import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
36import org.onosproject.store.cluster.messaging.MessageSubject;
37import org.onosproject.store.serializers.KryoNamespaces;
38import org.onosproject.store.service.Serializer;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070039import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li60804322015-12-02 14:46:31 -080044import org.slf4j.Logger;
Jian Li7d180c52016-02-01 21:53:08 -080045import org.slf4j.LoggerFactory;
Jian Li60804322015-12-02 14:46:31 -080046
Jian Li7d180c52016-02-01 21:53:08 -080047import java.util.Map;
Jian Li60804322015-12-02 14:46:31 -080048import java.util.Optional;
Jian Lic5cb4a12016-02-03 23:24:42 -080049import java.util.Set;
Jian Li23906cc2016-03-31 11:16:44 -070050import java.util.concurrent.CompletableFuture;
Jian Li67e1e152016-04-18 17:52:58 -070051import java.util.concurrent.TimeUnit;
Jian Li85060ac2016-02-04 09:58:56 -080052import java.util.stream.Collectors;
Jian Li60804322015-12-02 14:46:31 -080053
Jian Li23906cc2016-03-31 11:16:44 -070054import static com.google.common.base.Preconditions.checkArgument;
Jian Li23906cc2016-03-31 11:16:44 -070055import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
56import static org.onosproject.cpman.ControlResource.CPU_METRICS;
57import static org.onosproject.cpman.ControlResource.DISK_METRICS;
58import static org.onosproject.cpman.ControlResource.MEMORY_METRICS;
59import static org.onosproject.cpman.ControlResource.NETWORK_METRICS;
60import static org.onosproject.cpman.ControlResource.Type;
Jian Li60804322015-12-02 14:46:31 -080061
62/**
63 * Control plane monitoring service class.
64 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070065@Component(immediate = true, service = ControlPlaneMonitorService.class)
Jian Li60804322015-12-02 14:46:31 -080066public class ControlPlaneMonitor implements ControlPlaneMonitorService {
67
Jian Li7d180c52016-02-01 21:53:08 -080068 private final Logger log = LoggerFactory.getLogger(getClass());
69 private MetricsDatabase cpuMetrics;
70 private MetricsDatabase memoryMetrics;
71 private Map<DeviceId, MetricsDatabase> controlMessageMap;
72 private Map<String, MetricsDatabase> diskMetricsMap;
73 private Map<String, MetricsDatabase> networkMetricsMap;
74
Ray Milkeyd84f89b2018-08-17 14:54:17 -070075 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li7d180c52016-02-01 21:53:08 -080076 protected ClusterService clusterService;
77
Ray Milkeyd84f89b2018-08-17 14:54:17 -070078 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li23906cc2016-03-31 11:16:44 -070079 protected ClusterCommunicationService communicationService;
80
Jian Lidaf55ea2016-04-04 20:38:30 -070081 private static final String DEFAULT_RESOURCE = "default";
82
Jian Li85060ac2016-02-04 09:58:56 -080083 private static final Set RESOURCE_TYPE_SET =
84 ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK);
Jian Li7d180c52016-02-01 21:53:08 -080085
Jian Li23906cc2016-03-31 11:16:44 -070086 private static final MessageSubject CONTROL_STATS =
Jian Li67e1e152016-04-18 17:52:58 -070087 new MessageSubject("control-plane-stats");
Jian Li23906cc2016-03-31 11:16:44 -070088
Jian Li89eeccd2016-05-06 02:10:33 -070089 private static final MessageSubject CONTROL_RESOURCE =
90 new MessageSubject("control-plane-resources");
91
Jian Li7d180c52016-02-01 21:53:08 -080092 private Map<ControlMetricType, Double> cpuBuf;
93 private Map<ControlMetricType, Double> memoryBuf;
94 private Map<String, Map<ControlMetricType, Double>> diskBuf;
95 private Map<String, Map<ControlMetricType, Double>> networkBuf;
96 private Map<DeviceId, Map<ControlMetricType, Double>> ctrlMsgBuf;
Jian Li60804322015-12-02 14:46:31 -080097
Jian Li85060ac2016-02-04 09:58:56 -080098 private Map<Type, Set<String>> availableResourceMap;
99 private Set<DeviceId> availableDeviceIdSet;
100
Jian Li23906cc2016-03-31 11:16:44 -0700101 private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
Jian Li89eeccd2016-05-06 02:10:33 -0700102 private static final String RESOURCE_TYPE_NULL = "Control resource type cannot be null";
Jian Li23906cc2016-03-31 11:16:44 -0700103
104 private static final Serializer SERIALIZER = Serializer
105 .using(new KryoNamespace.Builder()
106 .register(KryoNamespaces.API)
107 .register(ControlMetricsRequest.class)
Jian Li89eeccd2016-05-06 02:10:33 -0700108 .register(ControlResourceRequest.class)
Jian Li67e1e152016-04-18 17:52:58 -0700109 .register(ControlLoadSnapshot.class)
Jian Li23906cc2016-03-31 11:16:44 -0700110 .register(ControlMetricType.class)
Jian Li89eeccd2016-05-06 02:10:33 -0700111 .register(ControlResource.Type.class)
Jian Li67e1e152016-04-18 17:52:58 -0700112 .register(TimeUnit.class)
Jian Li23906cc2016-03-31 11:16:44 -0700113 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
114
Jian Li60804322015-12-02 14:46:31 -0800115 @Activate
116 public void activate() {
Jian Lidaf55ea2016-04-04 20:38:30 -0700117 cpuMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.CPU, CPU_METRICS);
118 memoryMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.MEMORY, MEMORY_METRICS);
Jian Li85060ac2016-02-04 09:58:56 -0800119 controlMessageMap = Maps.newConcurrentMap();
120 diskMetricsMap = Maps.newConcurrentMap();
121 networkMetricsMap = Maps.newConcurrentMap();
Jian Li7d180c52016-02-01 21:53:08 -0800122
Jian Li85060ac2016-02-04 09:58:56 -0800123 cpuBuf = Maps.newConcurrentMap();
124 memoryBuf = Maps.newConcurrentMap();
125 diskBuf = Maps.newConcurrentMap();
126 networkBuf = Maps.newConcurrentMap();
127 ctrlMsgBuf = Maps.newConcurrentMap();
128
129 availableResourceMap = Maps.newConcurrentMap();
130 availableDeviceIdSet = Sets.newConcurrentHashSet();
Jian Li7d180c52016-02-01 21:53:08 -0800131
Jian Li67e1e152016-04-18 17:52:58 -0700132 communicationService.<ControlMetricsRequest, ControlLoadSnapshot>addSubscriber(CONTROL_STATS,
Jian Li89eeccd2016-05-06 02:10:33 -0700133 SERIALIZER::decode, this::handleMetricsRequest, SERIALIZER::encode);
134
135 communicationService.<ControlResourceRequest, Set<String>>addSubscriber(CONTROL_RESOURCE,
136 SERIALIZER::decode, this::handleResourceRequest, SERIALIZER::encode);
Jian Li23906cc2016-03-31 11:16:44 -0700137
Jian Li7d180c52016-02-01 21:53:08 -0800138 log.info("Started");
Jian Li60804322015-12-02 14:46:31 -0800139 }
140
141 @Deactivate
142 public void deactivate() {
Jian Li7d180c52016-02-01 21:53:08 -0800143
144 // TODO: need to handle the mdb close.
145 cpuBuf.clear();
146 memoryBuf.clear();
147 diskBuf.clear();
148 networkBuf.clear();
149 ctrlMsgBuf.clear();
150
Jian Li23906cc2016-03-31 11:16:44 -0700151 communicationService.removeSubscriber(CONTROL_STATS);
Jian Li89eeccd2016-05-06 02:10:33 -0700152 communicationService.removeSubscriber(CONTROL_RESOURCE);
Jian Li23906cc2016-03-31 11:16:44 -0700153
Jian Li7d180c52016-02-01 21:53:08 -0800154 log.info("Stopped");
Jian Li60804322015-12-02 14:46:31 -0800155 }
156
Jian Li60804322015-12-02 14:46:31 -0800157 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800158 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Li60804322015-12-02 14:46:31 -0800159 Optional<DeviceId> deviceId) {
Jian Li7d180c52016-02-01 21:53:08 -0800160 if (deviceId.isPresent()) {
Jian Li46148902016-01-29 13:33:50 -0800161
Jian Li7d180c52016-02-01 21:53:08 -0800162 // insert a new device entry if we cannot find any
Jian Li85060ac2016-02-04 09:58:56 -0800163 ctrlMsgBuf.putIfAbsent(deviceId.get(), Maps.newConcurrentMap());
Jian Li7d180c52016-02-01 21:53:08 -0800164
165 // update control message metrics
Jian Li85060ac2016-02-04 09:58:56 -0800166 if (CONTROL_MESSAGE_METRICS.contains(cm.metricType())) {
167
168 if (!availableDeviceIdSet.contains(deviceId.get())) {
169 availableDeviceIdSet.add(deviceId.get());
170 }
Jian Li7d180c52016-02-01 21:53:08 -0800171
172 // we will accumulate the metric value into buffer first
173 ctrlMsgBuf.get(deviceId.get()).putIfAbsent(cm.metricType(),
174 (double) cm.metricValue().getLoad());
175
176 // if buffer contains all control message metrics,
177 // we simply set and update the values into MetricsDatabase.
Jian Li85060ac2016-02-04 09:58:56 -0800178 if (ctrlMsgBuf.get(deviceId.get()).keySet()
179 .containsAll(CONTROL_MESSAGE_METRICS)) {
Jian Li7d180c52016-02-01 21:53:08 -0800180 updateControlMessages(ctrlMsgBuf.get(deviceId.get()), deviceId.get());
Jian Li1aa07822016-04-19 17:58:02 -0700181 ctrlMsgBuf.clear();
Jian Li7d180c52016-02-01 21:53:08 -0800182 }
183 }
184 } else {
185
186 // update cpu metrics
187 if (CPU_METRICS.contains(cm.metricType())) {
188 cpuBuf.putIfAbsent(cm.metricType(),
189 (double) cm.metricValue().getLoad());
190 if (cpuBuf.keySet().containsAll(CPU_METRICS)) {
191 cpuMetrics.updateMetrics(convertMap(cpuBuf));
192 cpuBuf.clear();
193 }
194 }
195
196 // update memory metrics
197 if (MEMORY_METRICS.contains(cm.metricType())) {
198 memoryBuf.putIfAbsent(cm.metricType(),
199 (double) cm.metricValue().getLoad());
200 if (memoryBuf.keySet().containsAll(MEMORY_METRICS)) {
201 memoryMetrics.updateMetrics(convertMap(memoryBuf));
202 memoryBuf.clear();
203 }
204 }
205 }
Jian Li60804322015-12-02 14:46:31 -0800206 }
207
208 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800209 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Lie044d1a2016-01-25 09:01:20 -0800210 String resourceName) {
Jian Li7d180c52016-02-01 21:53:08 -0800211 // update disk metrics
212 if (DISK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800213 diskBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
214
215 availableResourceMap.putIfAbsent(Type.DISK, Sets.newHashSet());
216 availableResourceMap.computeIfPresent(Type.DISK, (k, v) -> {
217 v.add(resourceName);
218 return v;
219 });
220
Jian Li7d180c52016-02-01 21:53:08 -0800221 diskBuf.get(resourceName).putIfAbsent(cm.metricType(),
222 (double) cm.metricValue().getLoad());
223 if (diskBuf.get(resourceName).keySet().containsAll(DISK_METRICS)) {
224 updateDiskMetrics(diskBuf.get(resourceName), resourceName);
225 diskBuf.clear();
226 }
227 }
Jian Lie044d1a2016-01-25 09:01:20 -0800228
Jian Li7d180c52016-02-01 21:53:08 -0800229 // update network metrics
230 if (NETWORK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800231 networkBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
232
233 availableResourceMap.putIfAbsent(Type.NETWORK, Sets.newHashSet());
234 availableResourceMap.computeIfPresent(Type.NETWORK, (k, v) -> {
235 v.add(resourceName);
236 return v;
237 });
238
Jian Li7d180c52016-02-01 21:53:08 -0800239 networkBuf.get(resourceName).putIfAbsent(cm.metricType(),
240 (double) cm.metricValue().getLoad());
241 if (networkBuf.get(resourceName).keySet().containsAll(NETWORK_METRICS)) {
242 updateNetworkMetrics(networkBuf.get(resourceName), resourceName);
243 networkBuf.clear();
244 }
245 }
Jian Lie044d1a2016-01-25 09:01:20 -0800246 }
247
248 @Override
Jian Li67e1e152016-04-18 17:52:58 -0700249 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
250 ControlMetricType type,
251 Optional<DeviceId> deviceId) {
252 if (clusterService.getLocalNode().id().equals(nodeId)) {
253 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId)));
254 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700255 return communicationService.sendAndReceive(createMetricsRequest(type, deviceId),
Jian Li67e1e152016-04-18 17:52:58 -0700256 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
257 }
258 }
259
260 @Override
261 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
262 ControlMetricType type,
263 String resourceName) {
264 if (clusterService.getLocalNode().id().equals(nodeId)) {
265 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName)));
266 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700267 return communicationService.sendAndReceive(createMetricsRequest(type, resourceName),
Jian Li67e1e152016-04-18 17:52:58 -0700268 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
269 }
270 }
271
272 @Override
273 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
274 ControlMetricType type,
275 int duration, TimeUnit unit,
276 Optional<DeviceId> deviceId) {
277 if (clusterService.getLocalNode().id().equals(nodeId)) {
278 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId), duration, unit));
279 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700280 return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, deviceId),
Jian Li67e1e152016-04-18 17:52:58 -0700281 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
282 }
283 }
284
285 @Override
286 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
287 ControlMetricType type,
288 int duration, TimeUnit unit,
289 String resourceName) {
290 if (clusterService.getLocalNode().id().equals(nodeId)) {
291 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName), duration, unit));
292 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700293 return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, resourceName),
Jian Li67e1e152016-04-18 17:52:58 -0700294 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
295 }
296 }
297
298 @Override
Jian Li89eeccd2016-05-06 02:10:33 -0700299 public CompletableFuture<Set<String>> availableResources(NodeId nodeId,
300 Type resourceType) {
301 if (clusterService.getLocalNode().id().equals(nodeId)) {
302 Set<String> resources = getLocalAvailableResources(resourceType);
303 return CompletableFuture.completedFuture(resources);
304 } else {
305 return communicationService.sendAndReceive(createResourceRequest(resourceType),
306 CONTROL_RESOURCE, SERIALIZER::encode, SERIALIZER::decode, nodeId);
Jian Li67e1e152016-04-18 17:52:58 -0700307 }
Jian Li67e1e152016-04-18 17:52:58 -0700308 }
309
310 /**
311 * Builds and returns metric database instance with given resource name,
312 * resource type and metric type.
313 *
314 * @param resourceName resource name
315 * @param resourceType resource type
316 * @param metricTypes metric type
317 * @return metric database instance
318 */
319 private MetricsDatabase genMDbBuilder(String resourceName,
320 Type resourceType,
321 Set<ControlMetricType> metricTypes) {
322 MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder();
323 builder.withMetricName(resourceType.toString());
324 builder.withResourceName(resourceName);
325 metricTypes.forEach(type -> builder.addMetricType(type.toString()));
326 return builder.build();
327 }
328
329 /**
330 * Updates network metrics with given metric map and resource name.
331 *
332 * @param metricMap a metric map which is comprised of metric type and value
333 * @param resourceName resource name
334 */
335 private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap,
336 String resourceName) {
Jian Li1aa07822016-04-19 17:58:02 -0700337 if (!networkMetricsMap.containsKey(resourceName)) {
338 networkMetricsMap.put(resourceName, genMDbBuilder(resourceName,
339 Type.NETWORK, NETWORK_METRICS));
340 }
Jian Li67e1e152016-04-18 17:52:58 -0700341 networkMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
342 }
343
344 /**
345 * Updates disk metrics with given metric map and resource name.
346 *
347 * @param metricMap a metric map which is comprised of metric type and value
348 * @param resourceName resource name
349 */
350 private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap,
351 String resourceName) {
Jian Li1aa07822016-04-19 17:58:02 -0700352 if (!diskMetricsMap.containsKey(resourceName)) {
353 diskMetricsMap.put(resourceName, genMDbBuilder(resourceName,
354 Type.DISK, DISK_METRICS));
355 }
Jian Li67e1e152016-04-18 17:52:58 -0700356 diskMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
357 }
358
359 /**
360 * Updates control message metrics with given metric map and device identifier.
361 *
362 * @param metricMap a metric map which is comprised of metric type and value
363 * @param deviceId device identifier
364 */
365 private void updateControlMessages(Map<ControlMetricType, Double> metricMap,
366 DeviceId deviceId) {
Jian Li1aa07822016-04-19 17:58:02 -0700367 if (!controlMessageMap.containsKey(deviceId)) {
368 controlMessageMap.put(deviceId, genMDbBuilder(deviceId.toString(),
369 Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS));
370 }
Jian Li67e1e152016-04-18 17:52:58 -0700371 controlMessageMap.get(deviceId).updateMetrics(convertMap(metricMap));
372 }
373
374 /**
375 * Converts metric map into a new map which contains string formatted metric type as key.
376 *
377 * @param metricMap metric map in which ControlMetricType is key
378 * @return a new map in which string formatted metric type is key
379 */
380 private Map<String, Double> convertMap(Map<ControlMetricType, Double> metricMap) {
381 if (metricMap == null) {
382 return ImmutableMap.of();
383 }
384 Map newMap = Maps.newConcurrentMap();
385 metricMap.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
386 return newMap;
387 }
388
389 /**
390 * Handles control metric request from remote node.
391 *
392 * @param request control metric request
393 * @return completable future object of control load snapshot
394 */
Jian Li89eeccd2016-05-06 02:10:33 -0700395 private CompletableFuture<ControlLoadSnapshot>
396 handleMetricsRequest(ControlMetricsRequest request) {
Jian Li67e1e152016-04-18 17:52:58 -0700397
398 checkArgument(request.getType() != null, METRIC_TYPE_NULL);
399
400 ControlLoad load;
401 if (request.getResourceName() != null && request.getUnit() != null) {
402 load = getLocalLoad(request.getType(), request.getResourceName());
403 } else {
404 load = getLocalLoad(request.getType(), request.getDeviceId());
405 }
406
407 long average;
408 if (request.getUnit() != null) {
409 average = load.average(request.getDuration(), request.getUnit());
410 } else {
411 average = load.average();
412 }
413 ControlLoadSnapshot resp =
414 new ControlLoadSnapshot(load.latest(), average, load.time());
415 return CompletableFuture.completedFuture(resp);
416 }
417
418 /**
Jian Li89eeccd2016-05-06 02:10:33 -0700419 * Handles control resource request from remote node.
420 *
421 * @param request control resource type
422 * @return completable future object of control resource set
423 */
424 private CompletableFuture<Set<String>>
425 handleResourceRequest(ControlResourceRequest request) {
426
427 checkArgument(request.getType() != null, RESOURCE_TYPE_NULL);
428
429 Set<String> resources = getLocalAvailableResources(request.getType());
430 return CompletableFuture.completedFuture(resources);
431 }
432
433 /**
Jian Li67e1e152016-04-18 17:52:58 -0700434 * Generates a control metric request.
435 *
436 * @param type control metric type
437 * @param deviceId device identifier
438 * @return control metric request instance
439 */
Jian Li89eeccd2016-05-06 02:10:33 -0700440 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
441 Optional<DeviceId> deviceId) {
Jian Li67e1e152016-04-18 17:52:58 -0700442 return new ControlMetricsRequest(type, deviceId);
443 }
444
445 /**
446 * Generates a control metric request with given projected time range.
447 *
448 * @param type control metric type
449 * @param duration projected time duration
450 * @param unit projected time unit
451 * @param deviceId device identifier
452 * @return control metric request instance
453 */
Jian Li89eeccd2016-05-06 02:10:33 -0700454 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
455 int duration, TimeUnit unit,
456 Optional<DeviceId> deviceId) {
Jian Li67e1e152016-04-18 17:52:58 -0700457 return new ControlMetricsRequest(type, duration, unit, deviceId);
458 }
459
460 /**
461 * Generates a control metric request.
462 *
463 * @param type control metric type
464 * @param resourceName resource name
465 * @return control metric request instance
466 */
Jian Li89eeccd2016-05-06 02:10:33 -0700467 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
468 String resourceName) {
Jian Li67e1e152016-04-18 17:52:58 -0700469 return new ControlMetricsRequest(type, resourceName);
470 }
471
472 /**
473 * Generates a control metric request with given projected time range.
474 *
475 * @param type control metric type
476 * @param duration projected time duration
477 * @param unit projected time unit
478 * @param resourceName resource name
479 * @return control metric request instance
480 */
Jian Li89eeccd2016-05-06 02:10:33 -0700481 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
482 int duration, TimeUnit unit,
483 String resourceName) {
Jian Li67e1e152016-04-18 17:52:58 -0700484 return new ControlMetricsRequest(type, duration, unit, resourceName);
485 }
486
487 /**
Jian Li89eeccd2016-05-06 02:10:33 -0700488 * Generates a control resource request with given resource type.
489 *
490 * @param type control resource type
491 * @return control resource request instance
492 */
493 private ControlResourceRequest createResourceRequest(ControlResource.Type type) {
494 return new ControlResourceRequest(type);
495 }
496
497 /**
Jian Li67e1e152016-04-18 17:52:58 -0700498 * Returns a snapshot of control load.
499 *
500 * @param cl control load
501 * @return a snapshot of control load
502 */
503 private ControlLoadSnapshot snapshot(ControlLoad cl) {
504 if (cl != null) {
505 return new ControlLoadSnapshot(cl.latest(), cl.average(), cl.time());
506 }
507 return null;
508 }
509
510 /**
511 * Returns a snapshot of control load with given projected time range.
512 *
513 * @param cl control load
514 * @param duration projected time duration
515 * @param unit projected time unit
516 * @return a snapshot of control load
517 */
518 private ControlLoadSnapshot snapshot(ControlLoad cl, int duration, TimeUnit unit) {
519 if (cl != null) {
Jian Li1aa07822016-04-19 17:58:02 -0700520
521 return new ControlLoadSnapshot(cl.latest(), cl.average(duration, unit),
522 cl.time(), cl.recent(duration, unit));
Jian Li67e1e152016-04-18 17:52:58 -0700523 }
524 return null;
525 }
526
527 /**
528 * Returns local control load.
529 *
530 * @param type metric type
531 * @param deviceId device identifier
532 * @return control load
533 */
534 private ControlLoad getLocalLoad(ControlMetricType type,
535 Optional<DeviceId> deviceId) {
Jian Li23906cc2016-03-31 11:16:44 -0700536 if (deviceId.isPresent()) {
Jian Li67e1e152016-04-18 17:52:58 -0700537 // returns control message stats
Jian Li23906cc2016-03-31 11:16:44 -0700538 if (CONTROL_MESSAGE_METRICS.contains(type) &&
539 availableDeviceIdSet.contains(deviceId.get())) {
540 return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
Jian Li7d180c52016-02-01 21:53:08 -0800541 }
542 } else {
Jian Li23906cc2016-03-31 11:16:44 -0700543 // returns controlLoad of CPU metrics
544 if (CPU_METRICS.contains(type)) {
545 return new DefaultControlLoad(cpuMetrics, type);
546 }
547
548 // returns memoryLoad of memory metrics
549 if (MEMORY_METRICS.contains(type)) {
550 return new DefaultControlLoad(memoryMetrics, type);
551 }
Jian Li7d180c52016-02-01 21:53:08 -0800552 }
Jian Li60804322015-12-02 14:46:31 -0800553 return null;
554 }
555
Jian Li67e1e152016-04-18 17:52:58 -0700556 /**
557 * Returns local control load.
558 *
559 * @param type metric type
560 * @param resourceName resource name
561 * @return control load
562 */
563 private ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
Jian Li89eeccd2016-05-06 02:10:33 -0700564 NodeId localNodeId = clusterService.getLocalNode().id();
565
Jian Li67e1e152016-04-18 17:52:58 -0700566 // returns disk I/O stats
Jian Li23906cc2016-03-31 11:16:44 -0700567 if (DISK_METRICS.contains(type) &&
Jian Li89eeccd2016-05-06 02:10:33 -0700568 availableResourcesSync(localNodeId, Type.DISK).contains(resourceName)) {
Jian Li23906cc2016-03-31 11:16:44 -0700569 return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
570 }
Jian Li7d180c52016-02-01 21:53:08 -0800571
Jian Li67e1e152016-04-18 17:52:58 -0700572 // returns network I/O stats
Jian Li23906cc2016-03-31 11:16:44 -0700573 if (NETWORK_METRICS.contains(type) &&
Jian Li89eeccd2016-05-06 02:10:33 -0700574 availableResourcesSync(localNodeId, Type.NETWORK).contains(resourceName)) {
Jian Li23906cc2016-03-31 11:16:44 -0700575 return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
Jian Li7d180c52016-02-01 21:53:08 -0800576 }
Jian Li60804322015-12-02 14:46:31 -0800577 return null;
578 }
Jian Li89eeccd2016-05-06 02:10:33 -0700579
580 /**
581 * Obtains the available resource list from local node.
582 *
583 * @param resourceType control resource type
584 * @return a set of available control resource
585 */
586 private Set<String> getLocalAvailableResources(Type resourceType) {
587 Set<String> resources = ImmutableSet.of();
588 if (RESOURCE_TYPE_SET.contains(resourceType)) {
589 if (Type.CONTROL_MESSAGE.equals(resourceType)) {
590 resources = ImmutableSet.copyOf(availableDeviceIdSet.stream()
591 .map(DeviceId::toString).collect(Collectors.toSet()));
592 } else {
593 Set<String> res = availableResourceMap.get(resourceType);
594 resources = res == null ? ImmutableSet.of() : res;
595 }
596 }
597 return resources;
598 }
Jian Li23906cc2016-03-31 11:16:44 -0700599}