blob: fa971b6c6178460185865f2a31a774931b4bf5fe [file] [log] [blame]
Jian Li60804322015-12-02 14:46:31 -08001/*
Jian Lie044d1a2016-01-25 09:01:20 -08002 * Copyright 2015-2016 Open Networking Laboratory
Jian Li60804322015-12-02 14:46:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jian Li6b86a762016-01-29 09:30:40 -080016package org.onosproject.cpman.impl;
Jian Li60804322015-12-02 14:46:31 -080017
Jian Li7d180c52016-02-01 21:53:08 -080018import com.google.common.collect.ImmutableSet;
Jian Li85060ac2016-02-04 09:58:56 -080019import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
Jian Li60804322015-12-02 14:46:31 -080021import org.apache.felix.scr.annotations.Activate;
Jian Lic132c112016-01-28 20:27:34 -080022import org.apache.felix.scr.annotations.Component;
Jian Li60804322015-12-02 14:46:31 -080023import org.apache.felix.scr.annotations.Deactivate;
Jian Li7d180c52016-02-01 21:53:08 -080024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Li60804322015-12-02 14:46:31 -080026import org.apache.felix.scr.annotations.Service;
Jian Li23906cc2016-03-31 11:16:44 -070027import org.onlab.util.KryoNamespace;
Jian Li7d180c52016-02-01 21:53:08 -080028import org.onosproject.cluster.ClusterService;
Jian Li60804322015-12-02 14:46:31 -080029import org.onosproject.cluster.NodeId;
Jian Li6b86a762016-01-29 09:30:40 -080030import org.onosproject.cpman.ControlLoad;
31import org.onosproject.cpman.ControlMetric;
32import org.onosproject.cpman.ControlMetricType;
33import org.onosproject.cpman.ControlPlaneMonitorService;
Jian Li7d180c52016-02-01 21:53:08 -080034import org.onosproject.cpman.MetricsDatabase;
Jian Li60804322015-12-02 14:46:31 -080035import org.onosproject.net.DeviceId;
Jian Li23906cc2016-03-31 11:16:44 -070036import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
37import org.onosproject.store.cluster.messaging.MessageSubject;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.Serializer;
Jian Li60804322015-12-02 14:46:31 -080040import org.slf4j.Logger;
Jian Li7d180c52016-02-01 21:53:08 -080041import org.slf4j.LoggerFactory;
Jian Li60804322015-12-02 14:46:31 -080042
Jian Li7d180c52016-02-01 21:53:08 -080043import java.util.Map;
Jian Li60804322015-12-02 14:46:31 -080044import java.util.Optional;
Jian Lic5cb4a12016-02-03 23:24:42 -080045import java.util.Set;
Jian Li23906cc2016-03-31 11:16:44 -070046import java.util.concurrent.CompletableFuture;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
Jian Li85060ac2016-02-04 09:58:56 -080049import java.util.stream.Collectors;
Jian Li60804322015-12-02 14:46:31 -080050
Jian Li23906cc2016-03-31 11:16:44 -070051import static com.google.common.base.Preconditions.checkArgument;
52import static org.onlab.util.Tools.groupedThreads;
53import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
54import static org.onosproject.cpman.ControlResource.CPU_METRICS;
55import static org.onosproject.cpman.ControlResource.DISK_METRICS;
56import static org.onosproject.cpman.ControlResource.MEMORY_METRICS;
57import static org.onosproject.cpman.ControlResource.NETWORK_METRICS;
58import static org.onosproject.cpman.ControlResource.Type;
Jian Li60804322015-12-02 14:46:31 -080059
60/**
61 * Control plane monitoring service class.
62 */
63@Component(immediate = true)
64@Service
65public class ControlPlaneMonitor implements ControlPlaneMonitorService {
66
Jian Li7d180c52016-02-01 21:53:08 -080067 private final Logger log = LoggerFactory.getLogger(getClass());
68 private MetricsDatabase cpuMetrics;
69 private MetricsDatabase memoryMetrics;
70 private Map<DeviceId, MetricsDatabase> controlMessageMap;
71 private Map<String, MetricsDatabase> diskMetricsMap;
72 private Map<String, MetricsDatabase> networkMetricsMap;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
76
Jian Li23906cc2016-03-31 11:16:44 -070077 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterCommunicationService communicationService;
79
Jian Lidaf55ea2016-04-04 20:38:30 -070080 private static final String DEFAULT_RESOURCE = "default";
81
Jian Li85060ac2016-02-04 09:58:56 -080082 private static final Set RESOURCE_TYPE_SET =
83 ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK);
Jian Li7d180c52016-02-01 21:53:08 -080084
Jian Li23906cc2016-03-31 11:16:44 -070085 private static final MessageSubject CONTROL_STATS =
86 new MessageSubject("control-plane-stats");
87
Jian Li7d180c52016-02-01 21:53:08 -080088 private Map<ControlMetricType, Double> cpuBuf;
89 private Map<ControlMetricType, Double> memoryBuf;
90 private Map<String, Map<ControlMetricType, Double>> diskBuf;
91 private Map<String, Map<ControlMetricType, Double>> networkBuf;
92 private Map<DeviceId, Map<ControlMetricType, Double>> ctrlMsgBuf;
Jian Li60804322015-12-02 14:46:31 -080093
Jian Li85060ac2016-02-04 09:58:56 -080094 private Map<Type, Set<String>> availableResourceMap;
95 private Set<DeviceId> availableDeviceIdSet;
96
Jian Li23906cc2016-03-31 11:16:44 -070097 private ExecutorService messageHandlingExecutor;
98
99 private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
100
Jian Lidaf55ea2016-04-04 20:38:30 -0700101 Set<Map<ControlMetricType, Double>> debugSets = Sets.newHashSet();
102
Jian Li23906cc2016-03-31 11:16:44 -0700103 private static final Serializer SERIALIZER = Serializer
104 .using(new KryoNamespace.Builder()
105 .register(KryoNamespaces.API)
106 .register(ControlMetricsRequest.class)
107 .register(DefaultControlLoad.class)
108 .register(DefaultMetricsDatabase.class)
109 .register(ControlMetricType.class)
110 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
111
Jian Li60804322015-12-02 14:46:31 -0800112 @Activate
113 public void activate() {
Jian Lidaf55ea2016-04-04 20:38:30 -0700114 cpuMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.CPU, CPU_METRICS);
115 memoryMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.MEMORY, MEMORY_METRICS);
Jian Li85060ac2016-02-04 09:58:56 -0800116 controlMessageMap = Maps.newConcurrentMap();
117 diskMetricsMap = Maps.newConcurrentMap();
118 networkMetricsMap = Maps.newConcurrentMap();
Jian Li7d180c52016-02-01 21:53:08 -0800119
Jian Li85060ac2016-02-04 09:58:56 -0800120 cpuBuf = Maps.newConcurrentMap();
121 memoryBuf = Maps.newConcurrentMap();
122 diskBuf = Maps.newConcurrentMap();
123 networkBuf = Maps.newConcurrentMap();
124 ctrlMsgBuf = Maps.newConcurrentMap();
125
126 availableResourceMap = Maps.newConcurrentMap();
127 availableDeviceIdSet = Sets.newConcurrentHashSet();
Jian Li7d180c52016-02-01 21:53:08 -0800128
Jian Li23906cc2016-03-31 11:16:44 -0700129 messageHandlingExecutor = Executors.newSingleThreadScheduledExecutor(
130 groupedThreads("onos/app/cpman", "message-handlers"));
131
132 communicationService.addSubscriber(CONTROL_STATS,
133 SERIALIZER::decode, this::handleRequest, messageHandlingExecutor);
134
Jian Li7d180c52016-02-01 21:53:08 -0800135 log.info("Started");
Jian Li60804322015-12-02 14:46:31 -0800136 }
137
138 @Deactivate
139 public void deactivate() {
Jian Li7d180c52016-02-01 21:53:08 -0800140
141 // TODO: need to handle the mdb close.
142 cpuBuf.clear();
143 memoryBuf.clear();
144 diskBuf.clear();
145 networkBuf.clear();
146 ctrlMsgBuf.clear();
147
Jian Li23906cc2016-03-31 11:16:44 -0700148 communicationService.removeSubscriber(CONTROL_STATS);
149
Jian Li7d180c52016-02-01 21:53:08 -0800150 log.info("Stopped");
Jian Li60804322015-12-02 14:46:31 -0800151 }
152
Jian Li60804322015-12-02 14:46:31 -0800153 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800154 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Li60804322015-12-02 14:46:31 -0800155 Optional<DeviceId> deviceId) {
Jian Li7d180c52016-02-01 21:53:08 -0800156 if (deviceId.isPresent()) {
Jian Li46148902016-01-29 13:33:50 -0800157
Jian Li7d180c52016-02-01 21:53:08 -0800158 // insert a new device entry if we cannot find any
Jian Li85060ac2016-02-04 09:58:56 -0800159 ctrlMsgBuf.putIfAbsent(deviceId.get(), Maps.newConcurrentMap());
Jian Li7d180c52016-02-01 21:53:08 -0800160
161 // update control message metrics
Jian Li85060ac2016-02-04 09:58:56 -0800162 if (CONTROL_MESSAGE_METRICS.contains(cm.metricType())) {
163
164 if (!availableDeviceIdSet.contains(deviceId.get())) {
165 availableDeviceIdSet.add(deviceId.get());
166 }
Jian Li7d180c52016-02-01 21:53:08 -0800167
168 // we will accumulate the metric value into buffer first
169 ctrlMsgBuf.get(deviceId.get()).putIfAbsent(cm.metricType(),
170 (double) cm.metricValue().getLoad());
171
172 // if buffer contains all control message metrics,
173 // we simply set and update the values into MetricsDatabase.
Jian Li85060ac2016-02-04 09:58:56 -0800174 if (ctrlMsgBuf.get(deviceId.get()).keySet()
175 .containsAll(CONTROL_MESSAGE_METRICS)) {
Jian Li7d180c52016-02-01 21:53:08 -0800176 updateControlMessages(ctrlMsgBuf.get(deviceId.get()), deviceId.get());
Jian Lidaf55ea2016-04-04 20:38:30 -0700177 ctrlMsgBuf.get(deviceId.get());
Jian Li7d180c52016-02-01 21:53:08 -0800178 }
179 }
180 } else {
181
182 // update cpu metrics
183 if (CPU_METRICS.contains(cm.metricType())) {
184 cpuBuf.putIfAbsent(cm.metricType(),
185 (double) cm.metricValue().getLoad());
186 if (cpuBuf.keySet().containsAll(CPU_METRICS)) {
187 cpuMetrics.updateMetrics(convertMap(cpuBuf));
188 cpuBuf.clear();
189 }
190 }
191
192 // update memory metrics
193 if (MEMORY_METRICS.contains(cm.metricType())) {
194 memoryBuf.putIfAbsent(cm.metricType(),
195 (double) cm.metricValue().getLoad());
196 if (memoryBuf.keySet().containsAll(MEMORY_METRICS)) {
197 memoryMetrics.updateMetrics(convertMap(memoryBuf));
198 memoryBuf.clear();
199 }
200 }
201 }
Jian Li60804322015-12-02 14:46:31 -0800202 }
203
204 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800205 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Lie044d1a2016-01-25 09:01:20 -0800206 String resourceName) {
Jian Li7d180c52016-02-01 21:53:08 -0800207 // update disk metrics
208 if (DISK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800209 diskBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
210
211 availableResourceMap.putIfAbsent(Type.DISK, Sets.newHashSet());
212 availableResourceMap.computeIfPresent(Type.DISK, (k, v) -> {
213 v.add(resourceName);
214 return v;
215 });
216
Jian Li7d180c52016-02-01 21:53:08 -0800217 diskBuf.get(resourceName).putIfAbsent(cm.metricType(),
218 (double) cm.metricValue().getLoad());
219 if (diskBuf.get(resourceName).keySet().containsAll(DISK_METRICS)) {
220 updateDiskMetrics(diskBuf.get(resourceName), resourceName);
221 diskBuf.clear();
222 }
223 }
Jian Lie044d1a2016-01-25 09:01:20 -0800224
Jian Li7d180c52016-02-01 21:53:08 -0800225 // update network metrics
226 if (NETWORK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800227 networkBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
228
229 availableResourceMap.putIfAbsent(Type.NETWORK, Sets.newHashSet());
230 availableResourceMap.computeIfPresent(Type.NETWORK, (k, v) -> {
231 v.add(resourceName);
232 return v;
233 });
234
Jian Li7d180c52016-02-01 21:53:08 -0800235 networkBuf.get(resourceName).putIfAbsent(cm.metricType(),
236 (double) cm.metricValue().getLoad());
237 if (networkBuf.get(resourceName).keySet().containsAll(NETWORK_METRICS)) {
238 updateNetworkMetrics(networkBuf.get(resourceName), resourceName);
239 networkBuf.clear();
240 }
241 }
Jian Lie044d1a2016-01-25 09:01:20 -0800242 }
243
244 @Override
Jian Li23906cc2016-03-31 11:16:44 -0700245 public ControlLoad getLocalLoad(ControlMetricType type,
246 Optional<DeviceId> deviceId) {
247 if (deviceId.isPresent()) {
248 if (CONTROL_MESSAGE_METRICS.contains(type) &&
249 availableDeviceIdSet.contains(deviceId.get())) {
250 return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
Jian Li7d180c52016-02-01 21:53:08 -0800251 }
252 } else {
Jian Li23906cc2016-03-31 11:16:44 -0700253 // returns controlLoad of CPU metrics
254 if (CPU_METRICS.contains(type)) {
255 return new DefaultControlLoad(cpuMetrics, type);
256 }
257
258 // returns memoryLoad of memory metrics
259 if (MEMORY_METRICS.contains(type)) {
260 return new DefaultControlLoad(memoryMetrics, type);
261 }
Jian Li7d180c52016-02-01 21:53:08 -0800262 }
Jian Li60804322015-12-02 14:46:31 -0800263 return null;
264 }
265
266 @Override
Jian Li23906cc2016-03-31 11:16:44 -0700267 public ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
268 if (DISK_METRICS.contains(type) &&
269 availableResources(Type.DISK).contains(resourceName)) {
270 return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
271 }
Jian Li7d180c52016-02-01 21:53:08 -0800272
Jian Li23906cc2016-03-31 11:16:44 -0700273 if (NETWORK_METRICS.contains(type) &&
274 availableResources(Type.NETWORK).contains(resourceName)) {
275 return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
Jian Li7d180c52016-02-01 21:53:08 -0800276 }
Jian Li60804322015-12-02 14:46:31 -0800277 return null;
278 }
Jian Li7d180c52016-02-01 21:53:08 -0800279
Jian Li85060ac2016-02-04 09:58:56 -0800280 @Override
Jian Li23906cc2016-03-31 11:16:44 -0700281 public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
282 ControlMetricType type,
283 Optional<DeviceId> deviceId) {
284 return communicationService.sendAndReceive(createRequest(type, deviceId),
285 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
286 }
287
288 @Override
289 public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
290 ControlMetricType type,
291 String resourceName) {
292 return communicationService.sendAndReceive(createRequest(type, resourceName),
293 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
294 }
295
296
297 @Override
Jian Li85060ac2016-02-04 09:58:56 -0800298 public Set<String> availableResources(Type resourceType) {
299 if (RESOURCE_TYPE_SET.contains(resourceType)) {
300 if (Type.CONTROL_MESSAGE.equals(resourceType)) {
301 return availableDeviceIdSet.stream().map(id ->
302 id.toString()).collect(Collectors.toSet());
303 } else {
Jian Li4563aa22016-04-04 14:57:38 -0700304 Set<String> res = availableResourceMap.get(resourceType);
305 return res == null ? ImmutableSet.of() : res;
Jian Li85060ac2016-02-04 09:58:56 -0800306 }
307 }
Jian Li4563aa22016-04-04 14:57:38 -0700308 return ImmutableSet.of();
Jian Li85060ac2016-02-04 09:58:56 -0800309 }
310
Jian Lidaf55ea2016-04-04 20:38:30 -0700311 private MetricsDatabase genMDbBuilder(String resourceName,
312 Type resourceType,
Jian Lic5cb4a12016-02-03 23:24:42 -0800313 Set<ControlMetricType> metricTypes) {
Jian Li7d180c52016-02-01 21:53:08 -0800314 MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder();
Jian Li85060ac2016-02-04 09:58:56 -0800315 builder.withMetricName(resourceType.toString());
Jian Lidaf55ea2016-04-04 20:38:30 -0700316 builder.withResourceName(resourceName);
Jian Li7d180c52016-02-01 21:53:08 -0800317 metricTypes.forEach(type -> builder.addMetricType(type.toString()));
318 return builder.build();
319 }
320
321 private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap,
322 String resName) {
Jian Lidaf55ea2016-04-04 20:38:30 -0700323 networkMetricsMap.putIfAbsent(resName, genMDbBuilder(resName,
324 Type.NETWORK, NETWORK_METRICS));
Jian Li7d180c52016-02-01 21:53:08 -0800325 networkMetricsMap.get(resName).updateMetrics(convertMap(metricMap));
326 }
327
328 private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap,
329 String resName) {
Jian Lidaf55ea2016-04-04 20:38:30 -0700330 diskMetricsMap.putIfAbsent(resName, genMDbBuilder(resName,
331 Type.DISK, DISK_METRICS));
Jian Li7d180c52016-02-01 21:53:08 -0800332 diskMetricsMap.get(resName).updateMetrics(convertMap(metricMap));
333 }
334
335 private void updateControlMessages(Map<ControlMetricType, Double> metricMap,
336 DeviceId devId) {
Jian Lidaf55ea2016-04-04 20:38:30 -0700337 controlMessageMap.putIfAbsent(devId, genMDbBuilder(devId.toString(),
338 Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS));
Jian Li7d180c52016-02-01 21:53:08 -0800339 controlMessageMap.get(devId).updateMetrics(convertMap(metricMap));
340 }
341
342 private Map convertMap(Map<ControlMetricType, Double> map) {
Jian Li85060ac2016-02-04 09:58:56 -0800343 Map newMap = Maps.newConcurrentMap();
Jian Li7d180c52016-02-01 21:53:08 -0800344 map.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
345 return newMap;
346 }
Jian Li23906cc2016-03-31 11:16:44 -0700347
348 private CompletableFuture<ControlLoad> handleRequest(ControlMetricsRequest request) {
349
350 checkArgument(request.getType() != null, METRIC_TYPE_NULL);
351
352 ControlLoad load;
353 if (request.getResourceName() != null) {
354 load = getLocalLoad(request.getType(), request.getResourceName());
355 } else {
356 load = getLocalLoad(request.getType(), request.getDeviceId());
357 }
358 return CompletableFuture.completedFuture(load);
359 }
360
361 private ControlMetricsRequest createRequest(ControlMetricType type,
362 Optional<DeviceId> deviceId) {
363 return new ControlMetricsRequest(type, deviceId);
364 }
365
366 private ControlMetricsRequest createRequest(ControlMetricType type,
367 String resourceName) {
368 return new ControlMetricsRequest(type, resourceName);
369 }
370}