blob: 9b276c3792dd02432557b0d035466290477a1dc3 [file] [log] [blame]
Jian Li60804322015-12-02 14:46:31 -08001/*
Jian Lie044d1a2016-01-25 09:01:20 -08002 * Copyright 2015-2016 Open Networking Laboratory
Jian Li60804322015-12-02 14:46:31 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Jian Li6b86a762016-01-29 09:30:40 -080016package org.onosproject.cpman.impl;
Jian Li60804322015-12-02 14:46:31 -080017
Jian Li7d180c52016-02-01 21:53:08 -080018import com.google.common.collect.ImmutableSet;
Jian Li85060ac2016-02-04 09:58:56 -080019import com.google.common.collect.Maps;
20import com.google.common.collect.Sets;
Jian Li60804322015-12-02 14:46:31 -080021import org.apache.felix.scr.annotations.Activate;
Jian Lic132c112016-01-28 20:27:34 -080022import org.apache.felix.scr.annotations.Component;
Jian Li60804322015-12-02 14:46:31 -080023import org.apache.felix.scr.annotations.Deactivate;
Jian Li7d180c52016-02-01 21:53:08 -080024import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Li60804322015-12-02 14:46:31 -080026import org.apache.felix.scr.annotations.Service;
Jian Li23906cc2016-03-31 11:16:44 -070027import org.onlab.util.KryoNamespace;
Jian Li7d180c52016-02-01 21:53:08 -080028import org.onosproject.cluster.ClusterService;
Jian Li60804322015-12-02 14:46:31 -080029import org.onosproject.cluster.NodeId;
Jian Li6b86a762016-01-29 09:30:40 -080030import org.onosproject.cpman.ControlLoad;
31import org.onosproject.cpman.ControlMetric;
32import org.onosproject.cpman.ControlMetricType;
33import org.onosproject.cpman.ControlPlaneMonitorService;
Jian Li7d180c52016-02-01 21:53:08 -080034import org.onosproject.cpman.MetricsDatabase;
Jian Li60804322015-12-02 14:46:31 -080035import org.onosproject.net.DeviceId;
Jian Li23906cc2016-03-31 11:16:44 -070036import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
37import org.onosproject.store.cluster.messaging.MessageSubject;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.Serializer;
Jian Li60804322015-12-02 14:46:31 -080040import org.slf4j.Logger;
Jian Li7d180c52016-02-01 21:53:08 -080041import org.slf4j.LoggerFactory;
Jian Li60804322015-12-02 14:46:31 -080042
Jian Li7d180c52016-02-01 21:53:08 -080043import java.util.Map;
Jian Li60804322015-12-02 14:46:31 -080044import java.util.Optional;
Jian Lic5cb4a12016-02-03 23:24:42 -080045import java.util.Set;
Jian Li23906cc2016-03-31 11:16:44 -070046import java.util.concurrent.CompletableFuture;
47import java.util.concurrent.ExecutorService;
48import java.util.concurrent.Executors;
Jian Li85060ac2016-02-04 09:58:56 -080049import java.util.stream.Collectors;
Jian Li60804322015-12-02 14:46:31 -080050
Jian Li23906cc2016-03-31 11:16:44 -070051import static com.google.common.base.Preconditions.checkArgument;
52import static org.onlab.util.Tools.groupedThreads;
53import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
54import static org.onosproject.cpman.ControlResource.CPU_METRICS;
55import static org.onosproject.cpman.ControlResource.DISK_METRICS;
56import static org.onosproject.cpman.ControlResource.MEMORY_METRICS;
57import static org.onosproject.cpman.ControlResource.NETWORK_METRICS;
58import static org.onosproject.cpman.ControlResource.Type;
Jian Li60804322015-12-02 14:46:31 -080059
60/**
61 * Control plane monitoring service class.
62 */
63@Component(immediate = true)
64@Service
65public class ControlPlaneMonitor implements ControlPlaneMonitorService {
66
Jian Li7d180c52016-02-01 21:53:08 -080067 private final Logger log = LoggerFactory.getLogger(getClass());
68 private MetricsDatabase cpuMetrics;
69 private MetricsDatabase memoryMetrics;
70 private Map<DeviceId, MetricsDatabase> controlMessageMap;
71 private Map<String, MetricsDatabase> diskMetricsMap;
72 private Map<String, MetricsDatabase> networkMetricsMap;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
75 protected ClusterService clusterService;
76
Jian Li23906cc2016-03-31 11:16:44 -070077 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 protected ClusterCommunicationService communicationService;
79
Jian Li85060ac2016-02-04 09:58:56 -080080 private static final Set RESOURCE_TYPE_SET =
81 ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK);
Jian Li7d180c52016-02-01 21:53:08 -080082
Jian Li23906cc2016-03-31 11:16:44 -070083 private static final MessageSubject CONTROL_STATS =
84 new MessageSubject("control-plane-stats");
85
Jian Li7d180c52016-02-01 21:53:08 -080086 private Map<ControlMetricType, Double> cpuBuf;
87 private Map<ControlMetricType, Double> memoryBuf;
88 private Map<String, Map<ControlMetricType, Double>> diskBuf;
89 private Map<String, Map<ControlMetricType, Double>> networkBuf;
90 private Map<DeviceId, Map<ControlMetricType, Double>> ctrlMsgBuf;
Jian Li60804322015-12-02 14:46:31 -080091
Jian Li85060ac2016-02-04 09:58:56 -080092 private Map<Type, Set<String>> availableResourceMap;
93 private Set<DeviceId> availableDeviceIdSet;
94
Jian Li23906cc2016-03-31 11:16:44 -070095 private ExecutorService messageHandlingExecutor;
96
97 private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
98
99 private static final Serializer SERIALIZER = Serializer
100 .using(new KryoNamespace.Builder()
101 .register(KryoNamespaces.API)
102 .register(ControlMetricsRequest.class)
103 .register(DefaultControlLoad.class)
104 .register(DefaultMetricsDatabase.class)
105 .register(ControlMetricType.class)
106 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
107
Jian Li60804322015-12-02 14:46:31 -0800108 @Activate
109 public void activate() {
Jian Li85060ac2016-02-04 09:58:56 -0800110 cpuMetrics = genMDbBuilder(Type.CPU, CPU_METRICS);
111 memoryMetrics = genMDbBuilder(Type.MEMORY, MEMORY_METRICS);
112 controlMessageMap = Maps.newConcurrentMap();
113 diskMetricsMap = Maps.newConcurrentMap();
114 networkMetricsMap = Maps.newConcurrentMap();
Jian Li7d180c52016-02-01 21:53:08 -0800115
Jian Li85060ac2016-02-04 09:58:56 -0800116 cpuBuf = Maps.newConcurrentMap();
117 memoryBuf = Maps.newConcurrentMap();
118 diskBuf = Maps.newConcurrentMap();
119 networkBuf = Maps.newConcurrentMap();
120 ctrlMsgBuf = Maps.newConcurrentMap();
121
122 availableResourceMap = Maps.newConcurrentMap();
123 availableDeviceIdSet = Sets.newConcurrentHashSet();
Jian Li7d180c52016-02-01 21:53:08 -0800124
Jian Li23906cc2016-03-31 11:16:44 -0700125 messageHandlingExecutor = Executors.newSingleThreadScheduledExecutor(
126 groupedThreads("onos/app/cpman", "message-handlers"));
127
128 communicationService.addSubscriber(CONTROL_STATS,
129 SERIALIZER::decode, this::handleRequest, messageHandlingExecutor);
130
Jian Li7d180c52016-02-01 21:53:08 -0800131 log.info("Started");
Jian Li60804322015-12-02 14:46:31 -0800132 }
133
134 @Deactivate
135 public void deactivate() {
Jian Li7d180c52016-02-01 21:53:08 -0800136
137 // TODO: need to handle the mdb close.
138 cpuBuf.clear();
139 memoryBuf.clear();
140 diskBuf.clear();
141 networkBuf.clear();
142 ctrlMsgBuf.clear();
143
Jian Li23906cc2016-03-31 11:16:44 -0700144 communicationService.removeSubscriber(CONTROL_STATS);
145
Jian Li7d180c52016-02-01 21:53:08 -0800146 log.info("Stopped");
Jian Li60804322015-12-02 14:46:31 -0800147 }
148
Jian Li60804322015-12-02 14:46:31 -0800149 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800150 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Li60804322015-12-02 14:46:31 -0800151 Optional<DeviceId> deviceId) {
Jian Li7d180c52016-02-01 21:53:08 -0800152 if (deviceId.isPresent()) {
Jian Li46148902016-01-29 13:33:50 -0800153
Jian Li7d180c52016-02-01 21:53:08 -0800154 // insert a new device entry if we cannot find any
Jian Li85060ac2016-02-04 09:58:56 -0800155 ctrlMsgBuf.putIfAbsent(deviceId.get(), Maps.newConcurrentMap());
Jian Li7d180c52016-02-01 21:53:08 -0800156
157 // update control message metrics
Jian Li85060ac2016-02-04 09:58:56 -0800158 if (CONTROL_MESSAGE_METRICS.contains(cm.metricType())) {
159
160 if (!availableDeviceIdSet.contains(deviceId.get())) {
161 availableDeviceIdSet.add(deviceId.get());
162 }
Jian Li7d180c52016-02-01 21:53:08 -0800163
164 // we will accumulate the metric value into buffer first
165 ctrlMsgBuf.get(deviceId.get()).putIfAbsent(cm.metricType(),
166 (double) cm.metricValue().getLoad());
167
168 // if buffer contains all control message metrics,
169 // we simply set and update the values into MetricsDatabase.
Jian Li85060ac2016-02-04 09:58:56 -0800170 if (ctrlMsgBuf.get(deviceId.get()).keySet()
171 .containsAll(CONTROL_MESSAGE_METRICS)) {
Jian Li7d180c52016-02-01 21:53:08 -0800172 updateControlMessages(ctrlMsgBuf.get(deviceId.get()), deviceId.get());
173 ctrlMsgBuf.get(deviceId.get()).clear();
174 }
175 }
176 } else {
177
178 // update cpu metrics
179 if (CPU_METRICS.contains(cm.metricType())) {
180 cpuBuf.putIfAbsent(cm.metricType(),
181 (double) cm.metricValue().getLoad());
182 if (cpuBuf.keySet().containsAll(CPU_METRICS)) {
183 cpuMetrics.updateMetrics(convertMap(cpuBuf));
184 cpuBuf.clear();
185 }
186 }
187
188 // update memory metrics
189 if (MEMORY_METRICS.contains(cm.metricType())) {
190 memoryBuf.putIfAbsent(cm.metricType(),
191 (double) cm.metricValue().getLoad());
192 if (memoryBuf.keySet().containsAll(MEMORY_METRICS)) {
193 memoryMetrics.updateMetrics(convertMap(memoryBuf));
194 memoryBuf.clear();
195 }
196 }
197 }
Jian Li60804322015-12-02 14:46:31 -0800198 }
199
200 @Override
Jian Lic5cb4a12016-02-03 23:24:42 -0800201 public void updateMetric(ControlMetric cm, int updateIntervalInMinutes,
Jian Lie044d1a2016-01-25 09:01:20 -0800202 String resourceName) {
Jian Li7d180c52016-02-01 21:53:08 -0800203 // update disk metrics
204 if (DISK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800205 diskBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
206
207 availableResourceMap.putIfAbsent(Type.DISK, Sets.newHashSet());
208 availableResourceMap.computeIfPresent(Type.DISK, (k, v) -> {
209 v.add(resourceName);
210 return v;
211 });
212
Jian Li7d180c52016-02-01 21:53:08 -0800213 diskBuf.get(resourceName).putIfAbsent(cm.metricType(),
214 (double) cm.metricValue().getLoad());
215 if (diskBuf.get(resourceName).keySet().containsAll(DISK_METRICS)) {
216 updateDiskMetrics(diskBuf.get(resourceName), resourceName);
217 diskBuf.clear();
218 }
219 }
Jian Lie044d1a2016-01-25 09:01:20 -0800220
Jian Li7d180c52016-02-01 21:53:08 -0800221 // update network metrics
222 if (NETWORK_METRICS.contains(cm.metricType())) {
Jian Li85060ac2016-02-04 09:58:56 -0800223 networkBuf.putIfAbsent(resourceName, Maps.newConcurrentMap());
224
225 availableResourceMap.putIfAbsent(Type.NETWORK, Sets.newHashSet());
226 availableResourceMap.computeIfPresent(Type.NETWORK, (k, v) -> {
227 v.add(resourceName);
228 return v;
229 });
230
Jian Li7d180c52016-02-01 21:53:08 -0800231 networkBuf.get(resourceName).putIfAbsent(cm.metricType(),
232 (double) cm.metricValue().getLoad());
233 if (networkBuf.get(resourceName).keySet().containsAll(NETWORK_METRICS)) {
234 updateNetworkMetrics(networkBuf.get(resourceName), resourceName);
235 networkBuf.clear();
236 }
237 }
Jian Lie044d1a2016-01-25 09:01:20 -0800238 }
239
240 @Override
Jian Li23906cc2016-03-31 11:16:44 -0700241 public ControlLoad getLocalLoad(ControlMetricType type,
242 Optional<DeviceId> deviceId) {
243 if (deviceId.isPresent()) {
244 if (CONTROL_MESSAGE_METRICS.contains(type) &&
245 availableDeviceIdSet.contains(deviceId.get())) {
246 return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
Jian Li7d180c52016-02-01 21:53:08 -0800247 }
248 } else {
Jian Li23906cc2016-03-31 11:16:44 -0700249 // returns controlLoad of CPU metrics
250 if (CPU_METRICS.contains(type)) {
251 return new DefaultControlLoad(cpuMetrics, type);
252 }
253
254 // returns memoryLoad of memory metrics
255 if (MEMORY_METRICS.contains(type)) {
256 return new DefaultControlLoad(memoryMetrics, type);
257 }
Jian Li7d180c52016-02-01 21:53:08 -0800258 }
Jian Li60804322015-12-02 14:46:31 -0800259 return null;
260 }
261
262 @Override
Jian Li23906cc2016-03-31 11:16:44 -0700263 public ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
264 if (DISK_METRICS.contains(type) &&
265 availableResources(Type.DISK).contains(resourceName)) {
266 return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
267 }
Jian Li7d180c52016-02-01 21:53:08 -0800268
Jian Li23906cc2016-03-31 11:16:44 -0700269 if (NETWORK_METRICS.contains(type) &&
270 availableResources(Type.NETWORK).contains(resourceName)) {
271 return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
Jian Li7d180c52016-02-01 21:53:08 -0800272 }
Jian Li60804322015-12-02 14:46:31 -0800273 return null;
274 }
Jian Li7d180c52016-02-01 21:53:08 -0800275
Jian Li85060ac2016-02-04 09:58:56 -0800276 @Override
Jian Li23906cc2016-03-31 11:16:44 -0700277 public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
278 ControlMetricType type,
279 Optional<DeviceId> deviceId) {
280 return communicationService.sendAndReceive(createRequest(type, deviceId),
281 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
282 }
283
284 @Override
285 public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
286 ControlMetricType type,
287 String resourceName) {
288 return communicationService.sendAndReceive(createRequest(type, resourceName),
289 CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
290 }
291
292
293 @Override
Jian Li85060ac2016-02-04 09:58:56 -0800294 public Set<String> availableResources(Type resourceType) {
295 if (RESOURCE_TYPE_SET.contains(resourceType)) {
296 if (Type.CONTROL_MESSAGE.equals(resourceType)) {
297 return availableDeviceIdSet.stream().map(id ->
298 id.toString()).collect(Collectors.toSet());
299 } else {
300 return availableResourceMap.get(resourceType);
301 }
302 }
303 return null;
304 }
305
306 private MetricsDatabase genMDbBuilder(Type resourceType,
Jian Lic5cb4a12016-02-03 23:24:42 -0800307 Set<ControlMetricType> metricTypes) {
Jian Li7d180c52016-02-01 21:53:08 -0800308 MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder();
Jian Li85060ac2016-02-04 09:58:56 -0800309 builder.withMetricName(resourceType.toString());
Jian Li7d180c52016-02-01 21:53:08 -0800310 metricTypes.forEach(type -> builder.addMetricType(type.toString()));
311 return builder.build();
312 }
313
314 private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap,
315 String resName) {
Jian Li85060ac2016-02-04 09:58:56 -0800316 networkMetricsMap.putIfAbsent(resName,
317 genMDbBuilder(Type.NETWORK, NETWORK_METRICS));
Jian Li7d180c52016-02-01 21:53:08 -0800318 networkMetricsMap.get(resName).updateMetrics(convertMap(metricMap));
319 }
320
321 private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap,
322 String resName) {
Jian Li85060ac2016-02-04 09:58:56 -0800323 diskMetricsMap.putIfAbsent(resName, genMDbBuilder(Type.DISK, DISK_METRICS));
Jian Li7d180c52016-02-01 21:53:08 -0800324 diskMetricsMap.get(resName).updateMetrics(convertMap(metricMap));
325 }
326
327 private void updateControlMessages(Map<ControlMetricType, Double> metricMap,
328 DeviceId devId) {
Jian Li85060ac2016-02-04 09:58:56 -0800329 controlMessageMap.putIfAbsent(devId,
330 genMDbBuilder(Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS));
Jian Li7d180c52016-02-01 21:53:08 -0800331 controlMessageMap.get(devId).updateMetrics(convertMap(metricMap));
332 }
333
334 private Map convertMap(Map<ControlMetricType, Double> map) {
Jian Li85060ac2016-02-04 09:58:56 -0800335 Map newMap = Maps.newConcurrentMap();
Jian Li7d180c52016-02-01 21:53:08 -0800336 map.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
337 return newMap;
338 }
Jian Li23906cc2016-03-31 11:16:44 -0700339
340 private CompletableFuture<ControlLoad> handleRequest(ControlMetricsRequest request) {
341
342 checkArgument(request.getType() != null, METRIC_TYPE_NULL);
343
344 ControlLoad load;
345 if (request.getResourceName() != null) {
346 load = getLocalLoad(request.getType(), request.getResourceName());
347 } else {
348 load = getLocalLoad(request.getType(), request.getDeviceId());
349 }
350 return CompletableFuture.completedFuture(load);
351 }
352
353 private ControlMetricsRequest createRequest(ControlMetricType type,
354 Optional<DeviceId> deviceId) {
355 return new ControlMetricsRequest(type, deviceId);
356 }
357
358 private ControlMetricsRequest createRequest(ControlMetricType type,
359 String resourceName) {
360 return new ControlMetricsRequest(type, resourceName);
361 }
362}