blob: 68981a664ed12bc724b4952eb2e34fb765115d49 [file] [log] [blame]
Jian Li60804322015-12-02 14:46:31 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2016-present Open Networking Laboratory
Jian Li60804322015-12-02 14:46:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jian Li6b86a762016-01-29 09:30:40 -080016package org.onosproject.cpman.impl;
Jian Li60804322015-12-02 14:46:31 -080017
Jian Li9f3a8852016-04-07 13:37:39 -070018import com.google.common.collect.ImmutableMap;
Jian Li7d180c52016-02-01 21:53:08 -080019import com.google.common.collect.ImmutableSet;
Jian Li85060ac2016-02-04 09:58:56 -080020import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Jian Li60804322015-12-02 14:46:31 -080022import org.apache.felix.scr.annotations.Activate;
Jian Lic132c112016-01-28 20:27:34 -080023import org.apache.felix.scr.annotations.Component;
Jian Li60804322015-12-02 14:46:31 -080024import org.apache.felix.scr.annotations.Deactivate;
Jian Li7d180c52016-02-01 21:53:08 -080025import org.apache.felix.scr.annotations.Reference;
26import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Li60804322015-12-02 14:46:31 -080027import org.apache.felix.scr.annotations.Service;
Jian Li23906cc2016-03-31 11:16:44 -070028import org.onlab.util.KryoNamespace;
Jian Li7d180c52016-02-01 21:53:08 -080029import org.onosproject.cluster.ClusterService;
Jian Li60804322015-12-02 14:46:31 -080030import org.onosproject.cluster.NodeId;
Jian Li6b86a762016-01-29 09:30:40 -080031import org.onosproject.cpman.ControlLoad;
32import org.onosproject.cpman.ControlMetric;
33import org.onosproject.cpman.ControlMetricType;
34import org.onosproject.cpman.ControlPlaneMonitorService;
Jian Li7d180c52016-02-01 21:53:08 -080035import org.onosproject.cpman.MetricsDatabase;
Jian Li60804322015-12-02 14:46:31 -080036import org.onosproject.net.DeviceId;
Jian Li23906cc2016-03-31 11:16:44 -070037import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
38import org.onosproject.store.cluster.messaging.MessageSubject;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.Serializer;
Jian Li60804322015-12-02 14:46:31 -080041import org.slf4j.Logger;
Jian Li7d180c52016-02-01 21:53:08 -080042import org.slf4j.LoggerFactory;
Jian Li60804322015-12-02 14:46:31 -080043
Jian Li7d180c52016-02-01 21:53:08 -080044import java.util.Map;
Jian Li60804322015-12-02 14:46:31 -080045import java.util.Optional;
Jian Lic5cb4a12016-02-03 23:24:42 -080046import java.util.Set;
Jian Li23906cc2016-03-31 11:16:44 -070047import java.util.concurrent.CompletableFuture;
48import java.util.concurrent.ExecutorService;
49import java.util.concurrent.Executors;
Jian Li85060ac2016-02-04 09:58:56 -080050import java.util.stream.Collectors;
Jian Li60804322015-12-02 14:46:31 -080051
Jian Li23906cc2016-03-31 11:16:44 -070052import static com.google.common.base.Preconditions.checkArgument;
53import static org.onlab.util.Tools.groupedThreads;
54import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
55import static org.onosproject.cpman.ControlResource.CPU_METRICS;
56import static org.onosproject.cpman.ControlResource.DISK_METRICS;
57import static org.onosproject.cpman.ControlResource.MEMORY_METRICS;
58import static org.onosproject.cpman.ControlResource.NETWORK_METRICS;
59import static org.onosproject.cpman.ControlResource.Type;
Jian Li60804322015-12-02 14:46:31 -080060
61/**
62 * Control plane monitoring service class.
63 */
64@Component(immediate = true)
65@Service
66public class ControlPlaneMonitor implements ControlPlaneMonitorService {
67
Jian Li7d180c52016-02-01 21:53:08 -080068 private final Logger log = LoggerFactory.getLogger(getClass());
69 private MetricsDatabase cpuMetrics;
70 private MetricsDatabase memoryMetrics;
71 private Map<DeviceId, MetricsDatabase> controlMessageMap;
72 private Map<String, MetricsDatabase> diskMetricsMap;
73 private Map<String, MetricsDatabase> networkMetricsMap;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected ClusterService clusterService;
77
Jian Li23906cc2016-03-31 11:16:44 -070078 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected ClusterCommunicationService communicationService;
80
Jian Lidaf55ea2016-04-04 20:38:30 -070081 private static final String DEFAULT_RESOURCE = "default";
82
Jian Li85060ac2016-02-04 09:58:56 -080083 private static final Set RESOURCE_TYPE_SET =
84 ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK);
Jian Li7d180c52016-02-01 21:53:08 -080085
Jian Li23906cc2016-03-31 11:16:44 -070086 private static final MessageSubject CONTROL_STATS =
87 new MessageSubject("control-plane-stats");
88
Jian Li7d180c52016-02-01 21:53:08 -080089 private Map<ControlMetricType, Double> cpuBuf;
90 private Map<ControlMetricType, Double> memoryBuf;
91 private Map<String, Map<ControlMetricType, Double>> diskBuf;
92 private Map<String, Map<ControlMetricType, Double>> networkBuf;
93 private Map<DeviceId, Map<ControlMetricType, Double>> ctrlMsgBuf;
Jian Li60804322015-12-02 14:46:31 -080094
Jian Li85060ac2016-02-04 09:58:56 -080095 private Map<Type, Set<String>> availableResourceMap;
96 private Set<DeviceId> availableDeviceIdSet;
97
Jian Li23906cc2016-03-31 11:16:44 -070098 private ExecutorService messageHandlingExecutor;
99
100 private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
101
Jian Lidaf55ea2016-04-04 20:38:30 -0700102 Set<Map<ControlMetricType, Double>> debugSets = Sets.newHashSet();
103
Jian Li23906cc2016-03-31 11:16:44 -0700104 private static final Serializer SERIALIZER = Serializer
105 .using(new KryoNamespace.Builder()
106 .register(KryoNamespaces.API)
107 .register(ControlMetricsRequest.class)
108 .register(DefaultControlLoad.class)
109 .register(DefaultMetricsDatabase.class)
110 .register(ControlMetricType.class)
111 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
112
Jian Li60804322015-12-02 14:46:31 -0800113 @Activate
114 public void activate() {
Jian Lidaf55ea2016-04-04 20:38:30 -0700115 cpuMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.CPU, CPU_METRICS);
116 memoryMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.MEMORY, MEMORY_METRICS);
Jian Li85060ac2016-02-04 09:58:56 -0800117 controlMessageMap = Maps.newConcurrentMap();
118 diskMetricsMap = Maps.newConcurrentMap();
119 networkMetricsMap = Maps.newConcurrentMap();
Jian Li7d180c52016-02-01 21:53:08 -0800120
Jian Li85060ac2016-02-04 09:58:56 -0800121 cpuBuf = Maps.newConcurrentMap();
122 memoryBuf = Maps.newConcurrentMap();
123 diskBuf = Maps.newConcurrentMap();
124 networkBuf = Maps.newConcurrentMap();
125 ctrlMsgBuf = Maps.newConcurrentMap();
126
127 availableResourceMap = Maps.newConcurrentMap();
128 availableDeviceIdSet = Sets.newConcurrentHashSet();
Jian Li7d180c52016-02-01 21:53:08 -0800129
Jian Li23906cc2016-03-31 11:16:44 -0700130 messageHandlingExecutor = Executors.newSingleThreadScheduledExecutor(
HIGUCHI Yutad9e01052016-04-14 09:31:42 -0700131 groupedThreads("onos/app/cpman", "message-handlers", log));
Jian Li23906cc2016-03-31 11:16:44 -0700132
133 communicationService.addSubscriber(CONTROL_STATS,
134 SERIALIZER::decode, this::handleRequest, messageHandlingExecutor);
135
Jian Li7d180c52016-02-01 21:53:08 -0800136 log.info("Started");
Jian Li60804322015-12-02 14:46:31 -0800137 }
138
139 @Deactivate
140 public void deactivate() {
Jian Li7d180c52016-02-01 21:53:08 -0800141
142 // TODO: need to handle the mdb close.
143 cpuBuf.clear();
144 memoryBuf.clear();
145 diskBuf.clear();
146 networkBuf.clear();
147 ctrlMsgBuf.clear();
148
Jian Li23906cc2016-03-31 11:16:44 -0700149 communicationService.removeSubscriber(CONTROL_STATS);
150
Jian Li7d180c52016-02-01 21:53:08 -0800151 log.info("Stopped");
Jian Li60804322015-12-02 14:46:31 -0800152 }
153
Jian Li60804322015-12-02 14:46:31 -0800154 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800155 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Li60804322015-12-02 14:46:31 -0800156 Optional<DeviceId> deviceId) {
Jian Li7d180c52016-02-01 21:53:08 -0800157 if (deviceId.isPresent()) {
Jian Li46148902016-01-29 13:33:50 -0800158
Jian Li7d180c52016-02-01 21:53:08 -0800159 // insert a new device entry if we cannot find any
Jian Li85060ac2016-02-04 09:58:56 -0800160 ctrlMsgBuf.putIfAbsent(deviceId.get(), Maps.newConcurrentMap());
Jian Li7d180c52016-02-01 21:53:08 -0800161
162 // update control message metrics
Jian Li85060ac2016-02-04 09:58:56 -0800163 if (CONTROL_MESSAGE_METRICS.contains(cm.metricType())) {
164
165 if (!availableDeviceIdSet.contains(deviceId.get())) {
166 availableDeviceIdSet.add(deviceId.get());
167 }
Jian Li7d180c52016-02-01 21:53:08 -0800168
169 // we will accumulate the metric value into buffer first
170 ctrlMsgBuf.get(deviceId.get()).putIfAbsent(cm.metricType(),
171 (double) cm.metricValue().getLoad());
172
173 // if buffer contains all control message metrics,
174 // we simply set and update the values into MetricsDatabase.
Jian Li85060ac2016-02-04 09:58:56 -0800175 if (ctrlMsgBuf.get(deviceId.get()).keySet()
176 .containsAll(CONTROL_MESSAGE_METRICS)) {
Jian Li7d180c52016-02-01 21:53:08 -0800177 updateControlMessages(ctrlMsgBuf.get(deviceId.get()), deviceId.get());
Jian Lidaf55ea2016-04-04 20:38:30 -0700178 ctrlMsgBuf.get(deviceId.get());
Jian Li7d180c52016-02-01 21:53:08 -0800179 }
180 }
181 } else {
182
183 // update cpu metrics
184 if (CPU_METRICS.contains(cm.metricType())) {
185 cpuBuf.putIfAbsent(cm.metricType(),
186 (double) cm.metricValue().getLoad());
187 if (cpuBuf.keySet().containsAll(CPU_METRICS)) {
188 cpuMetrics.updateMetrics(convertMap(cpuBuf));
189 cpuBuf.clear();
190 }
191 }
192
193 // update memory metrics
194 if (MEMORY_METRICS.contains(cm.metricType())) {
195 memoryBuf.putIfAbsent(cm.metricType(),
196 (double) cm.metricValue().getLoad());
197 if (memoryBuf.keySet().containsAll(MEMORY_METRICS)) {
198 memoryMetrics.updateMetrics(convertMap(memoryBuf));
199 memoryBuf.clear();
200 }
201 }
202 }
Jian Li60804322015-12-02 14:46:31 -0800203 }
204
205 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800206 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Lie044d1a2016-01-25 09:01:20 -0800207 String resourceName) {
Jian Li7d180c52016-02-01 21:53:08 -0800208 // update disk metrics
209 if (DISK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800210 diskBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
211
212 availableResourceMap.putIfAbsent(Type.DISK, Sets.newHashSet());
213 availableResourceMap.computeIfPresent(Type.DISK, (k, v) -> {
214 v.add(resourceName);
215 return v;
216 });
217
Jian Li7d180c52016-02-01 21:53:08 -0800218 diskBuf.get(resourceName).putIfAbsent(cm.metricType(),
219 (double) cm.metricValue().getLoad());
220 if (diskBuf.get(resourceName).keySet().containsAll(DISK_METRICS)) {
221 updateDiskMetrics(diskBuf.get(resourceName), resourceName);
222 diskBuf.clear();
223 }
224 }
Jian Lie044d1a2016-01-25 09:01:20 -0800225
Jian Li7d180c52016-02-01 21:53:08 -0800226 // update network metrics
227 if (NETWORK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800228 networkBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
229
230 availableResourceMap.putIfAbsent(Type.NETWORK, Sets.newHashSet());
231 availableResourceMap.computeIfPresent(Type.NETWORK, (k, v) -> {
232 v.add(resourceName);
233 return v;
234 });
235
Jian Li7d180c52016-02-01 21:53:08 -0800236 networkBuf.get(resourceName).putIfAbsent(cm.metricType(),
237 (double) cm.metricValue().getLoad());
238 if (networkBuf.get(resourceName).keySet().containsAll(NETWORK_METRICS)) {
239 updateNetworkMetrics(networkBuf.get(resourceName), resourceName);
240 networkBuf.clear();
241 }
242 }
Jian Lie044d1a2016-01-25 09:01:20 -0800243 }
244
245 @Override
Jian Li23906cc2016-03-31 11:16:44 -0700246 public ControlLoad getLocalLoad(ControlMetricType type,
247 Optional<DeviceId> deviceId) {
248 if (deviceId.isPresent()) {
249 if (CONTROL_MESSAGE_METRICS.contains(type) &&
250 availableDeviceIdSet.contains(deviceId.get())) {
251 return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
Jian Li7d180c52016-02-01 21:53:08 -0800252 }
253 } else {
Jian Li23906cc2016-03-31 11:16:44 -0700254 // returns controlLoad of CPU metrics
255 if (CPU_METRICS.contains(type)) {
256 return new DefaultControlLoad(cpuMetrics, type);
257 }
258
259 // returns memoryLoad of memory metrics
260 if (MEMORY_METRICS.contains(type)) {
261 return new DefaultControlLoad(memoryMetrics, type);
262 }
Jian Li7d180c52016-02-01 21:53:08 -0800263 }
Jian Li60804322015-12-02 14:46:31 -0800264 return null;
265 }
266
267 @Override
Jian Li23906cc2016-03-31 11:16:44 -0700268 public ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
269 if (DISK_METRICS.contains(type) &&
270 availableResources(Type.DISK).contains(resourceName)) {
271 return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
272 }
Jian Li7d180c52016-02-01 21:53:08 -0800273
Jian Li23906cc2016-03-31 11:16:44 -0700274 if (NETWORK_METRICS.contains(type) &&
275 availableResources(Type.NETWORK).contains(resourceName)) {
276 return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
Jian Li7d180c52016-02-01 21:53:08 -0800277 }
Jian Li60804322015-12-02 14:46:31 -0800278 return null;
279 }
Jian Li7d180c52016-02-01 21:53:08 -0800280
Jian Li85060ac2016-02-04 09:58:56 -0800281 @Override
Jian Li23906cc2016-03-31 11:16:44 -0700282 public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
283 ControlMetricType type,
284 Optional<DeviceId> deviceId) {
285 return communicationService.sendAndReceive(createRequest(type, deviceId),
286 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
287 }
288
289 @Override
290 public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
291 ControlMetricType type,
292 String resourceName) {
293 return communicationService.sendAndReceive(createRequest(type, resourceName),
294 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
295 }
296
297
298 @Override
Jian Li85060ac2016-02-04 09:58:56 -0800299 public Set<String> availableResources(Type resourceType) {
300 if (RESOURCE_TYPE_SET.contains(resourceType)) {
301 if (Type.CONTROL_MESSAGE.equals(resourceType)) {
302 return availableDeviceIdSet.stream().map(id ->
303 id.toString()).collect(Collectors.toSet());
304 } else {
Jian Li4563aa22016-04-04 14:57:38 -0700305 Set<String> res = availableResourceMap.get(resourceType);
306 return res == null ? ImmutableSet.of() : res;
Jian Li85060ac2016-02-04 09:58:56 -0800307 }
308 }
Jian Li4563aa22016-04-04 14:57:38 -0700309 return ImmutableSet.of();
Jian Li85060ac2016-02-04 09:58:56 -0800310 }
311
Jian Lidaf55ea2016-04-04 20:38:30 -0700312 private MetricsDatabase genMDbBuilder(String resourceName,
313 Type resourceType,
Jian Lic5cb4a12016-02-03 23:24:42 -0800314 Set<ControlMetricType> metricTypes) {
Jian Li7d180c52016-02-01 21:53:08 -0800315 MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder();
Jian Li85060ac2016-02-04 09:58:56 -0800316 builder.withMetricName(resourceType.toString());
Jian Lidaf55ea2016-04-04 20:38:30 -0700317 builder.withResourceName(resourceName);
Jian Li7d180c52016-02-01 21:53:08 -0800318 metricTypes.forEach(type -> builder.addMetricType(type.toString()));
319 return builder.build();
320 }
321
322 private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap,
323 String resName) {
Jian Lidaf55ea2016-04-04 20:38:30 -0700324 networkMetricsMap.putIfAbsent(resName, genMDbBuilder(resName,
325 Type.NETWORK, NETWORK_METRICS));
Jian Li7d180c52016-02-01 21:53:08 -0800326 networkMetricsMap.get(resName).updateMetrics(convertMap(metricMap));
327 }
328
329 private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap,
330 String resName) {
Jian Lidaf55ea2016-04-04 20:38:30 -0700331 diskMetricsMap.putIfAbsent(resName, genMDbBuilder(resName,
332 Type.DISK, DISK_METRICS));
Jian Li7d180c52016-02-01 21:53:08 -0800333 diskMetricsMap.get(resName).updateMetrics(convertMap(metricMap));
334 }
335
336 private void updateControlMessages(Map<ControlMetricType, Double> metricMap,
337 DeviceId devId) {
Jian Lidaf55ea2016-04-04 20:38:30 -0700338 controlMessageMap.putIfAbsent(devId, genMDbBuilder(devId.toString(),
339 Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS));
Jian Li7d180c52016-02-01 21:53:08 -0800340 controlMessageMap.get(devId).updateMetrics(convertMap(metricMap));
341 }
342
343 private Map convertMap(Map<ControlMetricType, Double> map) {
Jian Li9f3a8852016-04-07 13:37:39 -0700344 if (map == null) {
345 return ImmutableMap.of();
346 }
Jian Li85060ac2016-02-04 09:58:56 -0800347 Map newMap = Maps.newConcurrentMap();
Jian Li7d180c52016-02-01 21:53:08 -0800348 map.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
349 return newMap;
350 }
Jian Li23906cc2016-03-31 11:16:44 -0700351
352 private CompletableFuture<ControlLoad> handleRequest(ControlMetricsRequest request) {
353
354 checkArgument(request.getType() != null, METRIC_TYPE_NULL);
355
356 ControlLoad load;
357 if (request.getResourceName() != null) {
358 load = getLocalLoad(request.getType(), request.getResourceName());
359 } else {
360 load = getLocalLoad(request.getType(), request.getDeviceId());
361 }
362 return CompletableFuture.completedFuture(load);
363 }
364
365 private ControlMetricsRequest createRequest(ControlMetricType type,
366 Optional<DeviceId> deviceId) {
367 return new ControlMetricsRequest(type, deviceId);
368 }
369
370 private ControlMetricsRequest createRequest(ControlMetricType type,
371 String resourceName) {
372 return new ControlMetricsRequest(type, resourceName);
373 }
374}