blob: 37d542d6ee2f8a171253f13c86f43538d6c10270 [file] [log] [blame]
Jian Li60804322015-12-02 14:46:31 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jian Li60804322015-12-02 14:46:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jian Li6b86a762016-01-29 09:30:40 -080016package org.onosproject.cpman.impl;
Jian Li60804322015-12-02 14:46:31 -080017
Jian Li9f3a8852016-04-07 13:37:39 -070018import com.google.common.collect.ImmutableMap;
Jian Li7d180c52016-02-01 21:53:08 -080019import com.google.common.collect.ImmutableSet;
Jian Li85060ac2016-02-04 09:58:56 -080020import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Jian Li60804322015-12-02 14:46:31 -080022import org.apache.felix.scr.annotations.Activate;
Jian Lic132c112016-01-28 20:27:34 -080023import org.apache.felix.scr.annotations.Component;
Jian Li60804322015-12-02 14:46:31 -080024import org.apache.felix.scr.annotations.Deactivate;
Jian Li7d180c52016-02-01 21:53:08 -080025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Li60804322015-12-02 14:46:31 -080027import org.apache.felix.scr.annotations.Service;
Jian Li23906cc2016-03-31 11:16:44 -070028import org.onlab.util.KryoNamespace;
Jian Li7d180c52016-02-01 21:53:08 -080029import org.onosproject.cluster.ClusterService;
Jian Li60804322015-12-02 14:46:31 -080030import org.onosproject.cluster.NodeId;
Jian Li6b86a762016-01-29 09:30:40 -080031import org.onosproject.cpman.ControlLoad;
Jian Li67e1e152016-04-18 17:52:58 -070032import org.onosproject.cpman.ControlLoadSnapshot;
Jian Li6b86a762016-01-29 09:30:40 -080033import org.onosproject.cpman.ControlMetric;
34import org.onosproject.cpman.ControlMetricType;
Jian Li67e1e152016-04-18 17:52:58 -070035import org.onosproject.cpman.ControlMetricsRequest;
Jian Li6b86a762016-01-29 09:30:40 -080036import org.onosproject.cpman.ControlPlaneMonitorService;
Jian Li89eeccd2016-05-06 02:10:33 -070037import org.onosproject.cpman.ControlResource;
38import org.onosproject.cpman.ControlResourceRequest;
Jian Li7d180c52016-02-01 21:53:08 -080039import org.onosproject.cpman.MetricsDatabase;
Jian Li60804322015-12-02 14:46:31 -080040import org.onosproject.net.DeviceId;
Jian Li23906cc2016-03-31 11:16:44 -070041import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
42import org.onosproject.store.cluster.messaging.MessageSubject;
43import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.Serializer;
Jian Li60804322015-12-02 14:46:31 -080045import org.slf4j.Logger;
Jian Li7d180c52016-02-01 21:53:08 -080046import org.slf4j.LoggerFactory;
Jian Li60804322015-12-02 14:46:31 -080047
Jian Li7d180c52016-02-01 21:53:08 -080048import java.util.Map;
Jian Li60804322015-12-02 14:46:31 -080049import java.util.Optional;
Jian Lic5cb4a12016-02-03 23:24:42 -080050import java.util.Set;
Jian Li23906cc2016-03-31 11:16:44 -070051import java.util.concurrent.CompletableFuture;
Jian Li67e1e152016-04-18 17:52:58 -070052import java.util.concurrent.TimeUnit;
Jian Li85060ac2016-02-04 09:58:56 -080053import java.util.stream.Collectors;
Jian Li60804322015-12-02 14:46:31 -080054
Jian Li23906cc2016-03-31 11:16:44 -070055import static com.google.common.base.Preconditions.checkArgument;
Jian Li23906cc2016-03-31 11:16:44 -070056import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
57import static org.onosproject.cpman.ControlResource.CPU_METRICS;
58import static org.onosproject.cpman.ControlResource.DISK_METRICS;
59import static org.onosproject.cpman.ControlResource.MEMORY_METRICS;
60import static org.onosproject.cpman.ControlResource.NETWORK_METRICS;
61import static org.onosproject.cpman.ControlResource.Type;
Jian Li60804322015-12-02 14:46:31 -080062
63/**
64 * Control plane monitoring service class.
65 */
66@Component(immediate = true)
67@Service
68public class ControlPlaneMonitor implements ControlPlaneMonitorService {
69
Jian Li7d180c52016-02-01 21:53:08 -080070 private final Logger log = LoggerFactory.getLogger(getClass());
71 private MetricsDatabase cpuMetrics;
72 private MetricsDatabase memoryMetrics;
73 private Map<DeviceId, MetricsDatabase> controlMessageMap;
74 private Map<String, MetricsDatabase> diskMetricsMap;
75 private Map<String, MetricsDatabase> networkMetricsMap;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterService clusterService;
79
Jian Li23906cc2016-03-31 11:16:44 -070080 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81 protected ClusterCommunicationService communicationService;
82
Jian Lidaf55ea2016-04-04 20:38:30 -070083 private static final String DEFAULT_RESOURCE = "default";
84
Jian Li85060ac2016-02-04 09:58:56 -080085 private static final Set RESOURCE_TYPE_SET =
86 ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK);
Jian Li7d180c52016-02-01 21:53:08 -080087
Jian Li23906cc2016-03-31 11:16:44 -070088 private static final MessageSubject CONTROL_STATS =
Jian Li67e1e152016-04-18 17:52:58 -070089 new MessageSubject("control-plane-stats");
Jian Li23906cc2016-03-31 11:16:44 -070090
Jian Li89eeccd2016-05-06 02:10:33 -070091 private static final MessageSubject CONTROL_RESOURCE =
92 new MessageSubject("control-plane-resources");
93
Jian Li7d180c52016-02-01 21:53:08 -080094 private Map<ControlMetricType, Double> cpuBuf;
95 private Map<ControlMetricType, Double> memoryBuf;
96 private Map<String, Map<ControlMetricType, Double>> diskBuf;
97 private Map<String, Map<ControlMetricType, Double>> networkBuf;
98 private Map<DeviceId, Map<ControlMetricType, Double>> ctrlMsgBuf;
Jian Li60804322015-12-02 14:46:31 -080099
Jian Li85060ac2016-02-04 09:58:56 -0800100 private Map<Type, Set<String>> availableResourceMap;
101 private Set<DeviceId> availableDeviceIdSet;
102
Jian Li23906cc2016-03-31 11:16:44 -0700103 private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
Jian Li89eeccd2016-05-06 02:10:33 -0700104 private static final String RESOURCE_TYPE_NULL = "Control resource type cannot be null";
Jian Li23906cc2016-03-31 11:16:44 -0700105
106 private static final Serializer SERIALIZER = Serializer
107 .using(new KryoNamespace.Builder()
108 .register(KryoNamespaces.API)
Jian Li67e1e152016-04-18 17:52:58 -0700109 .register(KryoNamespaces.BASIC)
Jian Li23906cc2016-03-31 11:16:44 -0700110 .register(ControlMetricsRequest.class)
Jian Li89eeccd2016-05-06 02:10:33 -0700111 .register(ControlResourceRequest.class)
Jian Li67e1e152016-04-18 17:52:58 -0700112 .register(ControlLoadSnapshot.class)
Jian Li23906cc2016-03-31 11:16:44 -0700113 .register(ControlMetricType.class)
Jian Li89eeccd2016-05-06 02:10:33 -0700114 .register(ControlResource.Type.class)
Jian Li67e1e152016-04-18 17:52:58 -0700115 .register(TimeUnit.class)
Jian Li23906cc2016-03-31 11:16:44 -0700116 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
117
Jian Li60804322015-12-02 14:46:31 -0800118 @Activate
119 public void activate() {
Jian Lidaf55ea2016-04-04 20:38:30 -0700120 cpuMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.CPU, CPU_METRICS);
121 memoryMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.MEMORY, MEMORY_METRICS);
Jian Li85060ac2016-02-04 09:58:56 -0800122 controlMessageMap = Maps.newConcurrentMap();
123 diskMetricsMap = Maps.newConcurrentMap();
124 networkMetricsMap = Maps.newConcurrentMap();
Jian Li7d180c52016-02-01 21:53:08 -0800125
Jian Li85060ac2016-02-04 09:58:56 -0800126 cpuBuf = Maps.newConcurrentMap();
127 memoryBuf = Maps.newConcurrentMap();
128 diskBuf = Maps.newConcurrentMap();
129 networkBuf = Maps.newConcurrentMap();
130 ctrlMsgBuf = Maps.newConcurrentMap();
131
132 availableResourceMap = Maps.newConcurrentMap();
133 availableDeviceIdSet = Sets.newConcurrentHashSet();
Jian Li7d180c52016-02-01 21:53:08 -0800134
Jian Li67e1e152016-04-18 17:52:58 -0700135 communicationService.<ControlMetricsRequest, ControlLoadSnapshot>addSubscriber(CONTROL_STATS,
Jian Li89eeccd2016-05-06 02:10:33 -0700136 SERIALIZER::decode, this::handleMetricsRequest, SERIALIZER::encode);
137
138 communicationService.<ControlResourceRequest, Set<String>>addSubscriber(CONTROL_RESOURCE,
139 SERIALIZER::decode, this::handleResourceRequest, SERIALIZER::encode);
Jian Li23906cc2016-03-31 11:16:44 -0700140
Jian Li7d180c52016-02-01 21:53:08 -0800141 log.info("Started");
Jian Li60804322015-12-02 14:46:31 -0800142 }
143
144 @Deactivate
145 public void deactivate() {
Jian Li7d180c52016-02-01 21:53:08 -0800146
147 // TODO: need to handle the mdb close.
148 cpuBuf.clear();
149 memoryBuf.clear();
150 diskBuf.clear();
151 networkBuf.clear();
152 ctrlMsgBuf.clear();
153
Jian Li23906cc2016-03-31 11:16:44 -0700154 communicationService.removeSubscriber(CONTROL_STATS);
Jian Li89eeccd2016-05-06 02:10:33 -0700155 communicationService.removeSubscriber(CONTROL_RESOURCE);
Jian Li23906cc2016-03-31 11:16:44 -0700156
Jian Li7d180c52016-02-01 21:53:08 -0800157 log.info("Stopped");
Jian Li60804322015-12-02 14:46:31 -0800158 }
159
Jian Li60804322015-12-02 14:46:31 -0800160 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800161 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Li60804322015-12-02 14:46:31 -0800162 Optional<DeviceId> deviceId) {
Jian Li7d180c52016-02-01 21:53:08 -0800163 if (deviceId.isPresent()) {
Jian Li46148902016-01-29 13:33:50 -0800164
Jian Li7d180c52016-02-01 21:53:08 -0800165 // insert a new device entry if we cannot find any
Jian Li85060ac2016-02-04 09:58:56 -0800166 ctrlMsgBuf.putIfAbsent(deviceId.get(), Maps.newConcurrentMap());
Jian Li7d180c52016-02-01 21:53:08 -0800167
168 // update control message metrics
Jian Li85060ac2016-02-04 09:58:56 -0800169 if (CONTROL_MESSAGE_METRICS.contains(cm.metricType())) {
170
171 if (!availableDeviceIdSet.contains(deviceId.get())) {
172 availableDeviceIdSet.add(deviceId.get());
173 }
Jian Li7d180c52016-02-01 21:53:08 -0800174
175 // we will accumulate the metric value into buffer first
176 ctrlMsgBuf.get(deviceId.get()).putIfAbsent(cm.metricType(),
177 (double) cm.metricValue().getLoad());
178
179 // if buffer contains all control message metrics,
180 // we simply set and update the values into MetricsDatabase.
Jian Li85060ac2016-02-04 09:58:56 -0800181 if (ctrlMsgBuf.get(deviceId.get()).keySet()
182 .containsAll(CONTROL_MESSAGE_METRICS)) {
Jian Li7d180c52016-02-01 21:53:08 -0800183 updateControlMessages(ctrlMsgBuf.get(deviceId.get()), deviceId.get());
Jian Li1aa07822016-04-19 17:58:02 -0700184 ctrlMsgBuf.clear();
Jian Li7d180c52016-02-01 21:53:08 -0800185 }
186 }
187 } else {
188
189 // update cpu metrics
190 if (CPU_METRICS.contains(cm.metricType())) {
191 cpuBuf.putIfAbsent(cm.metricType(),
192 (double) cm.metricValue().getLoad());
193 if (cpuBuf.keySet().containsAll(CPU_METRICS)) {
194 cpuMetrics.updateMetrics(convertMap(cpuBuf));
195 cpuBuf.clear();
196 }
197 }
198
199 // update memory metrics
200 if (MEMORY_METRICS.contains(cm.metricType())) {
201 memoryBuf.putIfAbsent(cm.metricType(),
202 (double) cm.metricValue().getLoad());
203 if (memoryBuf.keySet().containsAll(MEMORY_METRICS)) {
204 memoryMetrics.updateMetrics(convertMap(memoryBuf));
205 memoryBuf.clear();
206 }
207 }
208 }
Jian Li60804322015-12-02 14:46:31 -0800209 }
210
211 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800212 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Lie044d1a2016-01-25 09:01:20 -0800213 String resourceName) {
Jian Li7d180c52016-02-01 21:53:08 -0800214 // update disk metrics
215 if (DISK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800216 diskBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
217
218 availableResourceMap.putIfAbsent(Type.DISK, Sets.newHashSet());
219 availableResourceMap.computeIfPresent(Type.DISK, (k, v) -> {
220 v.add(resourceName);
221 return v;
222 });
223
Jian Li7d180c52016-02-01 21:53:08 -0800224 diskBuf.get(resourceName).putIfAbsent(cm.metricType(),
225 (double) cm.metricValue().getLoad());
226 if (diskBuf.get(resourceName).keySet().containsAll(DISK_METRICS)) {
227 updateDiskMetrics(diskBuf.get(resourceName), resourceName);
228 diskBuf.clear();
229 }
230 }
Jian Lie044d1a2016-01-25 09:01:20 -0800231
Jian Li7d180c52016-02-01 21:53:08 -0800232 // update network metrics
233 if (NETWORK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800234 networkBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
235
236 availableResourceMap.putIfAbsent(Type.NETWORK, Sets.newHashSet());
237 availableResourceMap.computeIfPresent(Type.NETWORK, (k, v) -> {
238 v.add(resourceName);
239 return v;
240 });
241
Jian Li7d180c52016-02-01 21:53:08 -0800242 networkBuf.get(resourceName).putIfAbsent(cm.metricType(),
243 (double) cm.metricValue().getLoad());
244 if (networkBuf.get(resourceName).keySet().containsAll(NETWORK_METRICS)) {
245 updateNetworkMetrics(networkBuf.get(resourceName), resourceName);
246 networkBuf.clear();
247 }
248 }
Jian Lie044d1a2016-01-25 09:01:20 -0800249 }
250
251 @Override
Jian Li67e1e152016-04-18 17:52:58 -0700252 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
253 ControlMetricType type,
254 Optional<DeviceId> deviceId) {
255 if (clusterService.getLocalNode().id().equals(nodeId)) {
256 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId)));
257 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700258 return communicationService.sendAndReceive(createMetricsRequest(type, deviceId),
Jian Li67e1e152016-04-18 17:52:58 -0700259 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
260 }
261 }
262
263 @Override
264 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
265 ControlMetricType type,
266 String resourceName) {
267 if (clusterService.getLocalNode().id().equals(nodeId)) {
268 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName)));
269 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700270 return communicationService.sendAndReceive(createMetricsRequest(type, resourceName),
Jian Li67e1e152016-04-18 17:52:58 -0700271 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
272 }
273 }
274
275 @Override
276 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
277 ControlMetricType type,
278 int duration, TimeUnit unit,
279 Optional<DeviceId> deviceId) {
280 if (clusterService.getLocalNode().id().equals(nodeId)) {
281 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId), duration, unit));
282 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700283 return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, deviceId),
Jian Li67e1e152016-04-18 17:52:58 -0700284 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
285 }
286 }
287
288 @Override
289 public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId,
290 ControlMetricType type,
291 int duration, TimeUnit unit,
292 String resourceName) {
293 if (clusterService.getLocalNode().id().equals(nodeId)) {
294 return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName), duration, unit));
295 } else {
Jian Li89eeccd2016-05-06 02:10:33 -0700296 return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, resourceName),
Jian Li67e1e152016-04-18 17:52:58 -0700297 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
298 }
299 }
300
301 @Override
Jian Li89eeccd2016-05-06 02:10:33 -0700302 public CompletableFuture<Set<String>> availableResources(NodeId nodeId,
303 Type resourceType) {
304 if (clusterService.getLocalNode().id().equals(nodeId)) {
305 Set<String> resources = getLocalAvailableResources(resourceType);
306 return CompletableFuture.completedFuture(resources);
307 } else {
308 return communicationService.sendAndReceive(createResourceRequest(resourceType),
309 CONTROL_RESOURCE, SERIALIZER::encode, SERIALIZER::decode, nodeId);
Jian Li67e1e152016-04-18 17:52:58 -0700310 }
Jian Li67e1e152016-04-18 17:52:58 -0700311 }
312
313 /**
314 * Builds and returns metric database instance with given resource name,
315 * resource type and metric type.
316 *
317 * @param resourceName resource name
318 * @param resourceType resource type
319 * @param metricTypes metric type
320 * @return metric database instance
321 */
322 private MetricsDatabase genMDbBuilder(String resourceName,
323 Type resourceType,
324 Set<ControlMetricType> metricTypes) {
325 MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder();
326 builder.withMetricName(resourceType.toString());
327 builder.withResourceName(resourceName);
328 metricTypes.forEach(type -> builder.addMetricType(type.toString()));
329 return builder.build();
330 }
331
332 /**
333 * Updates network metrics with given metric map and resource name.
334 *
335 * @param metricMap a metric map which is comprised of metric type and value
336 * @param resourceName resource name
337 */
338 private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap,
339 String resourceName) {
Jian Li1aa07822016-04-19 17:58:02 -0700340 if (!networkMetricsMap.containsKey(resourceName)) {
341 networkMetricsMap.put(resourceName, genMDbBuilder(resourceName,
342 Type.NETWORK, NETWORK_METRICS));
343 }
Jian Li67e1e152016-04-18 17:52:58 -0700344 networkMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
345 }
346
347 /**
348 * Updates disk metrics with given metric map and resource name.
349 *
350 * @param metricMap a metric map which is comprised of metric type and value
351 * @param resourceName resource name
352 */
353 private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap,
354 String resourceName) {
Jian Li1aa07822016-04-19 17:58:02 -0700355 if (!diskMetricsMap.containsKey(resourceName)) {
356 diskMetricsMap.put(resourceName, genMDbBuilder(resourceName,
357 Type.DISK, DISK_METRICS));
358 }
Jian Li67e1e152016-04-18 17:52:58 -0700359 diskMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap));
360 }
361
362 /**
363 * Updates control message metrics with given metric map and device identifier.
364 *
365 * @param metricMap a metric map which is comprised of metric type and value
366 * @param deviceId device identifier
367 */
368 private void updateControlMessages(Map<ControlMetricType, Double> metricMap,
369 DeviceId deviceId) {
Jian Li1aa07822016-04-19 17:58:02 -0700370 if (!controlMessageMap.containsKey(deviceId)) {
371 controlMessageMap.put(deviceId, genMDbBuilder(deviceId.toString(),
372 Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS));
373 }
Jian Li67e1e152016-04-18 17:52:58 -0700374 controlMessageMap.get(deviceId).updateMetrics(convertMap(metricMap));
375 }
376
377 /**
378 * Converts metric map into a new map which contains string formatted metric type as key.
379 *
380 * @param metricMap metric map in which ControlMetricType is key
381 * @return a new map in which string formatted metric type is key
382 */
383 private Map<String, Double> convertMap(Map<ControlMetricType, Double> metricMap) {
384 if (metricMap == null) {
385 return ImmutableMap.of();
386 }
387 Map newMap = Maps.newConcurrentMap();
388 metricMap.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
389 return newMap;
390 }
391
392 /**
393 * Handles control metric request from remote node.
394 *
395 * @param request control metric request
396 * @return completable future object of control load snapshot
397 */
Jian Li89eeccd2016-05-06 02:10:33 -0700398 private CompletableFuture<ControlLoadSnapshot>
399 handleMetricsRequest(ControlMetricsRequest request) {
Jian Li67e1e152016-04-18 17:52:58 -0700400
401 checkArgument(request.getType() != null, METRIC_TYPE_NULL);
402
403 ControlLoad load;
404 if (request.getResourceName() != null && request.getUnit() != null) {
405 load = getLocalLoad(request.getType(), request.getResourceName());
406 } else {
407 load = getLocalLoad(request.getType(), request.getDeviceId());
408 }
409
410 long average;
411 if (request.getUnit() != null) {
412 average = load.average(request.getDuration(), request.getUnit());
413 } else {
414 average = load.average();
415 }
416 ControlLoadSnapshot resp =
417 new ControlLoadSnapshot(load.latest(), average, load.time());
418 return CompletableFuture.completedFuture(resp);
419 }
420
421 /**
Jian Li89eeccd2016-05-06 02:10:33 -0700422 * Handles control resource request from remote node.
423 *
424 * @param request control resource type
425 * @return completable future object of control resource set
426 */
427 private CompletableFuture<Set<String>>
428 handleResourceRequest(ControlResourceRequest request) {
429
430 checkArgument(request.getType() != null, RESOURCE_TYPE_NULL);
431
432 Set<String> resources = getLocalAvailableResources(request.getType());
433 return CompletableFuture.completedFuture(resources);
434 }
435
436 /**
Jian Li67e1e152016-04-18 17:52:58 -0700437 * Generates a control metric request.
438 *
439 * @param type control metric type
440 * @param deviceId device identifier
441 * @return control metric request instance
442 */
Jian Li89eeccd2016-05-06 02:10:33 -0700443 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
444 Optional<DeviceId> deviceId) {
Jian Li67e1e152016-04-18 17:52:58 -0700445 return new ControlMetricsRequest(type, deviceId);
446 }
447
448 /**
449 * Generates a control metric request with given projected time range.
450 *
451 * @param type control metric type
452 * @param duration projected time duration
453 * @param unit projected time unit
454 * @param deviceId device identifier
455 * @return control metric request instance
456 */
Jian Li89eeccd2016-05-06 02:10:33 -0700457 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
458 int duration, TimeUnit unit,
459 Optional<DeviceId> deviceId) {
Jian Li67e1e152016-04-18 17:52:58 -0700460 return new ControlMetricsRequest(type, duration, unit, deviceId);
461 }
462
463 /**
464 * Generates a control metric request.
465 *
466 * @param type control metric type
467 * @param resourceName resource name
468 * @return control metric request instance
469 */
Jian Li89eeccd2016-05-06 02:10:33 -0700470 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
471 String resourceName) {
Jian Li67e1e152016-04-18 17:52:58 -0700472 return new ControlMetricsRequest(type, resourceName);
473 }
474
475 /**
476 * Generates a control metric request with given projected time range.
477 *
478 * @param type control metric type
479 * @param duration projected time duration
480 * @param unit projected time unit
481 * @param resourceName resource name
482 * @return control metric request instance
483 */
Jian Li89eeccd2016-05-06 02:10:33 -0700484 private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
485 int duration, TimeUnit unit,
486 String resourceName) {
Jian Li67e1e152016-04-18 17:52:58 -0700487 return new ControlMetricsRequest(type, duration, unit, resourceName);
488 }
489
490 /**
Jian Li89eeccd2016-05-06 02:10:33 -0700491 * Generates a control resource request with given resource type.
492 *
493 * @param type control resource type
494 * @return control resource request instance
495 */
496 private ControlResourceRequest createResourceRequest(ControlResource.Type type) {
497 return new ControlResourceRequest(type);
498 }
499
500 /**
Jian Li67e1e152016-04-18 17:52:58 -0700501 * Returns a snapshot of control load.
502 *
503 * @param cl control load
504 * @return a snapshot of control load
505 */
506 private ControlLoadSnapshot snapshot(ControlLoad cl) {
507 if (cl != null) {
508 return new ControlLoadSnapshot(cl.latest(), cl.average(), cl.time());
509 }
510 return null;
511 }
512
513 /**
514 * Returns a snapshot of control load with given projected time range.
515 *
516 * @param cl control load
517 * @param duration projected time duration
518 * @param unit projected time unit
519 * @return a snapshot of control load
520 */
521 private ControlLoadSnapshot snapshot(ControlLoad cl, int duration, TimeUnit unit) {
522 if (cl != null) {
Jian Li1aa07822016-04-19 17:58:02 -0700523
524 return new ControlLoadSnapshot(cl.latest(), cl.average(duration, unit),
525 cl.time(), cl.recent(duration, unit));
Jian Li67e1e152016-04-18 17:52:58 -0700526 }
527 return null;
528 }
529
530 /**
531 * Returns local control load.
532 *
533 * @param type metric type
534 * @param deviceId device identifier
535 * @return control load
536 */
537 private ControlLoad getLocalLoad(ControlMetricType type,
538 Optional<DeviceId> deviceId) {
Jian Li23906cc2016-03-31 11:16:44 -0700539 if (deviceId.isPresent()) {
Jian Li67e1e152016-04-18 17:52:58 -0700540 // returns control message stats
Jian Li23906cc2016-03-31 11:16:44 -0700541 if (CONTROL_MESSAGE_METRICS.contains(type) &&
542 availableDeviceIdSet.contains(deviceId.get())) {
543 return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
Jian Li7d180c52016-02-01 21:53:08 -0800544 }
545 } else {
Jian Li23906cc2016-03-31 11:16:44 -0700546 // returns controlLoad of CPU metrics
547 if (CPU_METRICS.contains(type)) {
548 return new DefaultControlLoad(cpuMetrics, type);
549 }
550
551 // returns memoryLoad of memory metrics
552 if (MEMORY_METRICS.contains(type)) {
553 return new DefaultControlLoad(memoryMetrics, type);
554 }
Jian Li7d180c52016-02-01 21:53:08 -0800555 }
Jian Li60804322015-12-02 14:46:31 -0800556 return null;
557 }
558
Jian Li67e1e152016-04-18 17:52:58 -0700559 /**
560 * Returns local control load.
561 *
562 * @param type metric type
563 * @param resourceName resource name
564 * @return control load
565 */
566 private ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
Jian Li89eeccd2016-05-06 02:10:33 -0700567 NodeId localNodeId = clusterService.getLocalNode().id();
568
Jian Li67e1e152016-04-18 17:52:58 -0700569 // returns disk I/O stats
Jian Li23906cc2016-03-31 11:16:44 -0700570 if (DISK_METRICS.contains(type) &&
Jian Li89eeccd2016-05-06 02:10:33 -0700571 availableResourcesSync(localNodeId, Type.DISK).contains(resourceName)) {
Jian Li23906cc2016-03-31 11:16:44 -0700572 return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
573 }
Jian Li7d180c52016-02-01 21:53:08 -0800574
Jian Li67e1e152016-04-18 17:52:58 -0700575 // returns network I/O stats
Jian Li23906cc2016-03-31 11:16:44 -0700576 if (NETWORK_METRICS.contains(type) &&
Jian Li89eeccd2016-05-06 02:10:33 -0700577 availableResourcesSync(localNodeId, Type.NETWORK).contains(resourceName)) {
Jian Li23906cc2016-03-31 11:16:44 -0700578 return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
Jian Li7d180c52016-02-01 21:53:08 -0800579 }
Jian Li60804322015-12-02 14:46:31 -0800580 return null;
581 }
Jian Li89eeccd2016-05-06 02:10:33 -0700582
583 /**
584 * Obtains the available resource list from local node.
585 *
586 * @param resourceType control resource type
587 * @return a set of available control resource
588 */
589 private Set<String> getLocalAvailableResources(Type resourceType) {
590 Set<String> resources = ImmutableSet.of();
591 if (RESOURCE_TYPE_SET.contains(resourceType)) {
592 if (Type.CONTROL_MESSAGE.equals(resourceType)) {
593 resources = ImmutableSet.copyOf(availableDeviceIdSet.stream()
594 .map(DeviceId::toString).collect(Collectors.toSet()));
595 } else {
596 Set<String> res = availableResourceMap.get(resourceType);
597 resources = res == null ? ImmutableSet.of() : res;
598 }
599 }
600 return resources;
601 }
Jian Li23906cc2016-03-31 11:16:44 -0700602}