Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 1 | /* |
Brian O'Connor | a09fe5b | 2017-08-03 21:12:30 -0700 | [diff] [blame] | 2 | * Copyright 2016-present Open Networking Foundation |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
Jian Li | 6b86a76 | 2016-01-29 09:30:40 -0800 | [diff] [blame] | 16 | package org.onosproject.cpman.impl; |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 17 | |
Jian Li | 9f3a885 | 2016-04-07 13:37:39 -0700 | [diff] [blame] | 18 | import com.google.common.collect.ImmutableMap; |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 19 | import com.google.common.collect.ImmutableSet; |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 20 | import com.google.common.collect.Maps; |
| 21 | import com.google.common.collect.Sets; |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 22 | import org.onlab.util.KryoNamespace; |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 23 | import org.onosproject.cluster.ClusterService; |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 24 | import org.onosproject.cluster.NodeId; |
Jian Li | 6b86a76 | 2016-01-29 09:30:40 -0800 | [diff] [blame] | 25 | import org.onosproject.cpman.ControlLoad; |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 26 | import org.onosproject.cpman.ControlLoadSnapshot; |
Jian Li | 6b86a76 | 2016-01-29 09:30:40 -0800 | [diff] [blame] | 27 | import org.onosproject.cpman.ControlMetric; |
| 28 | import org.onosproject.cpman.ControlMetricType; |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 29 | import org.onosproject.cpman.ControlMetricsRequest; |
Jian Li | 6b86a76 | 2016-01-29 09:30:40 -0800 | [diff] [blame] | 30 | import org.onosproject.cpman.ControlPlaneMonitorService; |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 31 | import org.onosproject.cpman.ControlResource; |
| 32 | import org.onosproject.cpman.ControlResourceRequest; |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 33 | import org.onosproject.cpman.MetricsDatabase; |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 34 | import org.onosproject.net.DeviceId; |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 35 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
| 36 | import org.onosproject.store.cluster.messaging.MessageSubject; |
| 37 | import org.onosproject.store.serializers.KryoNamespaces; |
| 38 | import org.onosproject.store.service.Serializer; |
Ray Milkey | d84f89b | 2018-08-17 14:54:17 -0700 | [diff] [blame^] | 39 | import org.osgi.service.component.annotations.Activate; |
| 40 | import org.osgi.service.component.annotations.Component; |
| 41 | import org.osgi.service.component.annotations.Deactivate; |
| 42 | import org.osgi.service.component.annotations.Reference; |
| 43 | import org.osgi.service.component.annotations.ReferenceCardinality; |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 44 | import org.slf4j.Logger; |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 45 | import org.slf4j.LoggerFactory; |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 46 | |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 47 | import java.util.Map; |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 48 | import java.util.Optional; |
Jian Li | c5cb4a1 | 2016-02-03 23:24:42 -0800 | [diff] [blame] | 49 | import java.util.Set; |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 50 | import java.util.concurrent.CompletableFuture; |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 51 | import java.util.concurrent.TimeUnit; |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 52 | import java.util.stream.Collectors; |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 53 | |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 54 | import static com.google.common.base.Preconditions.checkArgument; |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 55 | import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS; |
| 56 | import static org.onosproject.cpman.ControlResource.CPU_METRICS; |
| 57 | import static org.onosproject.cpman.ControlResource.DISK_METRICS; |
| 58 | import static org.onosproject.cpman.ControlResource.MEMORY_METRICS; |
| 59 | import static org.onosproject.cpman.ControlResource.NETWORK_METRICS; |
| 60 | import static org.onosproject.cpman.ControlResource.Type; |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 61 | |
| 62 | /** |
| 63 | * Control plane monitoring service class. |
| 64 | */ |
Ray Milkey | d84f89b | 2018-08-17 14:54:17 -0700 | [diff] [blame^] | 65 | @Component(immediate = true, service = ControlPlaneMonitorService.class) |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 66 | public class ControlPlaneMonitor implements ControlPlaneMonitorService { |
| 67 | |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 68 | private final Logger log = LoggerFactory.getLogger(getClass()); |
| 69 | private MetricsDatabase cpuMetrics; |
| 70 | private MetricsDatabase memoryMetrics; |
| 71 | private Map<DeviceId, MetricsDatabase> controlMessageMap; |
| 72 | private Map<String, MetricsDatabase> diskMetricsMap; |
| 73 | private Map<String, MetricsDatabase> networkMetricsMap; |
| 74 | |
Ray Milkey | d84f89b | 2018-08-17 14:54:17 -0700 | [diff] [blame^] | 75 | @Reference(cardinality = ReferenceCardinality.MANDATORY) |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 76 | protected ClusterService clusterService; |
| 77 | |
Ray Milkey | d84f89b | 2018-08-17 14:54:17 -0700 | [diff] [blame^] | 78 | @Reference(cardinality = ReferenceCardinality.MANDATORY) |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 79 | protected ClusterCommunicationService communicationService; |
| 80 | |
Jian Li | daf55ea | 2016-04-04 20:38:30 -0700 | [diff] [blame] | 81 | private static final String DEFAULT_RESOURCE = "default"; |
| 82 | |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 83 | private static final Set RESOURCE_TYPE_SET = |
| 84 | ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK); |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 85 | |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 86 | private static final MessageSubject CONTROL_STATS = |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 87 | new MessageSubject("control-plane-stats"); |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 88 | |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 89 | private static final MessageSubject CONTROL_RESOURCE = |
| 90 | new MessageSubject("control-plane-resources"); |
| 91 | |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 92 | private Map<ControlMetricType, Double> cpuBuf; |
| 93 | private Map<ControlMetricType, Double> memoryBuf; |
| 94 | private Map<String, Map<ControlMetricType, Double>> diskBuf; |
| 95 | private Map<String, Map<ControlMetricType, Double>> networkBuf; |
| 96 | private Map<DeviceId, Map<ControlMetricType, Double>> ctrlMsgBuf; |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 97 | |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 98 | private Map<Type, Set<String>> availableResourceMap; |
| 99 | private Set<DeviceId> availableDeviceIdSet; |
| 100 | |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 101 | private static final String METRIC_TYPE_NULL = "Control metric type cannot be null"; |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 102 | private static final String RESOURCE_TYPE_NULL = "Control resource type cannot be null"; |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 103 | |
| 104 | private static final Serializer SERIALIZER = Serializer |
| 105 | .using(new KryoNamespace.Builder() |
| 106 | .register(KryoNamespaces.API) |
| 107 | .register(ControlMetricsRequest.class) |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 108 | .register(ControlResourceRequest.class) |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 109 | .register(ControlLoadSnapshot.class) |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 110 | .register(ControlMetricType.class) |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 111 | .register(ControlResource.Type.class) |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 112 | .register(TimeUnit.class) |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 113 | .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build()); |
| 114 | |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 115 | @Activate |
| 116 | public void activate() { |
Jian Li | daf55ea | 2016-04-04 20:38:30 -0700 | [diff] [blame] | 117 | cpuMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.CPU, CPU_METRICS); |
| 118 | memoryMetrics = genMDbBuilder(DEFAULT_RESOURCE, Type.MEMORY, MEMORY_METRICS); |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 119 | controlMessageMap = Maps.newConcurrentMap(); |
| 120 | diskMetricsMap = Maps.newConcurrentMap(); |
| 121 | networkMetricsMap = Maps.newConcurrentMap(); |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 122 | |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 123 | cpuBuf = Maps.newConcurrentMap(); |
| 124 | memoryBuf = Maps.newConcurrentMap(); |
| 125 | diskBuf = Maps.newConcurrentMap(); |
| 126 | networkBuf = Maps.newConcurrentMap(); |
| 127 | ctrlMsgBuf = Maps.newConcurrentMap(); |
| 128 | |
| 129 | availableResourceMap = Maps.newConcurrentMap(); |
| 130 | availableDeviceIdSet = Sets.newConcurrentHashSet(); |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 131 | |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 132 | communicationService.<ControlMetricsRequest, ControlLoadSnapshot>addSubscriber(CONTROL_STATS, |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 133 | SERIALIZER::decode, this::handleMetricsRequest, SERIALIZER::encode); |
| 134 | |
| 135 | communicationService.<ControlResourceRequest, Set<String>>addSubscriber(CONTROL_RESOURCE, |
| 136 | SERIALIZER::decode, this::handleResourceRequest, SERIALIZER::encode); |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 137 | |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 138 | log.info("Started"); |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 139 | } |
| 140 | |
| 141 | @Deactivate |
| 142 | public void deactivate() { |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 143 | |
| 144 | // TODO: need to handle the mdb close. |
| 145 | cpuBuf.clear(); |
| 146 | memoryBuf.clear(); |
| 147 | diskBuf.clear(); |
| 148 | networkBuf.clear(); |
| 149 | ctrlMsgBuf.clear(); |
| 150 | |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 151 | communicationService.removeSubscriber(CONTROL_STATS); |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 152 | communicationService.removeSubscriber(CONTROL_RESOURCE); |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 153 | |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 154 | log.info("Stopped"); |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 155 | } |
| 156 | |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 157 | @Override |
Jian Li | c5cb4a1 | 2016-02-03 23:24:42 -0800 | [diff] [blame] | 158 | public void updateMetric(ControlMetric cm, int updateIntervalInMinutes, |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 159 | Optional<DeviceId> deviceId) { |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 160 | if (deviceId.isPresent()) { |
Jian Li | 4614890 | 2016-01-29 13:33:50 -0800 | [diff] [blame] | 161 | |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 162 | // insert a new device entry if we cannot find any |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 163 | ctrlMsgBuf.putIfAbsent(deviceId.get(), Maps.newConcurrentMap()); |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 164 | |
| 165 | // update control message metrics |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 166 | if (CONTROL_MESSAGE_METRICS.contains(cm.metricType())) { |
| 167 | |
| 168 | if (!availableDeviceIdSet.contains(deviceId.get())) { |
| 169 | availableDeviceIdSet.add(deviceId.get()); |
| 170 | } |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 171 | |
| 172 | // we will accumulate the metric value into buffer first |
| 173 | ctrlMsgBuf.get(deviceId.get()).putIfAbsent(cm.metricType(), |
| 174 | (double) cm.metricValue().getLoad()); |
| 175 | |
| 176 | // if buffer contains all control message metrics, |
| 177 | // we simply set and update the values into MetricsDatabase. |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 178 | if (ctrlMsgBuf.get(deviceId.get()).keySet() |
| 179 | .containsAll(CONTROL_MESSAGE_METRICS)) { |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 180 | updateControlMessages(ctrlMsgBuf.get(deviceId.get()), deviceId.get()); |
Jian Li | 1aa0782 | 2016-04-19 17:58:02 -0700 | [diff] [blame] | 181 | ctrlMsgBuf.clear(); |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 182 | } |
| 183 | } |
| 184 | } else { |
| 185 | |
| 186 | // update cpu metrics |
| 187 | if (CPU_METRICS.contains(cm.metricType())) { |
| 188 | cpuBuf.putIfAbsent(cm.metricType(), |
| 189 | (double) cm.metricValue().getLoad()); |
| 190 | if (cpuBuf.keySet().containsAll(CPU_METRICS)) { |
| 191 | cpuMetrics.updateMetrics(convertMap(cpuBuf)); |
| 192 | cpuBuf.clear(); |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | // update memory metrics |
| 197 | if (MEMORY_METRICS.contains(cm.metricType())) { |
| 198 | memoryBuf.putIfAbsent(cm.metricType(), |
| 199 | (double) cm.metricValue().getLoad()); |
| 200 | if (memoryBuf.keySet().containsAll(MEMORY_METRICS)) { |
| 201 | memoryMetrics.updateMetrics(convertMap(memoryBuf)); |
| 202 | memoryBuf.clear(); |
| 203 | } |
| 204 | } |
| 205 | } |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 206 | } |
| 207 | |
| 208 | @Override |
Jian Li | c5cb4a1 | 2016-02-03 23:24:42 -0800 | [diff] [blame] | 209 | public void updateMetric(ControlMetric cm, int updateIntervalInMinutes, |
Jian Li | e044d1a | 2016-01-25 09:01:20 -0800 | [diff] [blame] | 210 | String resourceName) { |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 211 | // update disk metrics |
| 212 | if (DISK_METRICS.contains(cm.metricType())) { |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 213 | diskBuf.putIfAbsent(resourceName, Maps.newConcurrentMap()); |
| 214 | |
| 215 | availableResourceMap.putIfAbsent(Type.DISK, Sets.newHashSet()); |
| 216 | availableResourceMap.computeIfPresent(Type.DISK, (k, v) -> { |
| 217 | v.add(resourceName); |
| 218 | return v; |
| 219 | }); |
| 220 | |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 221 | diskBuf.get(resourceName).putIfAbsent(cm.metricType(), |
| 222 | (double) cm.metricValue().getLoad()); |
| 223 | if (diskBuf.get(resourceName).keySet().containsAll(DISK_METRICS)) { |
| 224 | updateDiskMetrics(diskBuf.get(resourceName), resourceName); |
| 225 | diskBuf.clear(); |
| 226 | } |
| 227 | } |
Jian Li | e044d1a | 2016-01-25 09:01:20 -0800 | [diff] [blame] | 228 | |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 229 | // update network metrics |
| 230 | if (NETWORK_METRICS.contains(cm.metricType())) { |
Jian Li | 85060ac | 2016-02-04 09:58:56 -0800 | [diff] [blame] | 231 | networkBuf.putIfAbsent(resourceName, Maps.newConcurrentMap()); |
| 232 | |
| 233 | availableResourceMap.putIfAbsent(Type.NETWORK, Sets.newHashSet()); |
| 234 | availableResourceMap.computeIfPresent(Type.NETWORK, (k, v) -> { |
| 235 | v.add(resourceName); |
| 236 | return v; |
| 237 | }); |
| 238 | |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 239 | networkBuf.get(resourceName).putIfAbsent(cm.metricType(), |
| 240 | (double) cm.metricValue().getLoad()); |
| 241 | if (networkBuf.get(resourceName).keySet().containsAll(NETWORK_METRICS)) { |
| 242 | updateNetworkMetrics(networkBuf.get(resourceName), resourceName); |
| 243 | networkBuf.clear(); |
| 244 | } |
| 245 | } |
Jian Li | e044d1a | 2016-01-25 09:01:20 -0800 | [diff] [blame] | 246 | } |
| 247 | |
| 248 | @Override |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 249 | public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId, |
| 250 | ControlMetricType type, |
| 251 | Optional<DeviceId> deviceId) { |
| 252 | if (clusterService.getLocalNode().id().equals(nodeId)) { |
| 253 | return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId))); |
| 254 | } else { |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 255 | return communicationService.sendAndReceive(createMetricsRequest(type, deviceId), |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 256 | CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId); |
| 257 | } |
| 258 | } |
| 259 | |
| 260 | @Override |
| 261 | public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId, |
| 262 | ControlMetricType type, |
| 263 | String resourceName) { |
| 264 | if (clusterService.getLocalNode().id().equals(nodeId)) { |
| 265 | return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName))); |
| 266 | } else { |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 267 | return communicationService.sendAndReceive(createMetricsRequest(type, resourceName), |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 268 | CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId); |
| 269 | } |
| 270 | } |
| 271 | |
| 272 | @Override |
| 273 | public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId, |
| 274 | ControlMetricType type, |
| 275 | int duration, TimeUnit unit, |
| 276 | Optional<DeviceId> deviceId) { |
| 277 | if (clusterService.getLocalNode().id().equals(nodeId)) { |
| 278 | return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId), duration, unit)); |
| 279 | } else { |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 280 | return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, deviceId), |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 281 | CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId); |
| 282 | } |
| 283 | } |
| 284 | |
| 285 | @Override |
| 286 | public CompletableFuture<ControlLoadSnapshot> getLoad(NodeId nodeId, |
| 287 | ControlMetricType type, |
| 288 | int duration, TimeUnit unit, |
| 289 | String resourceName) { |
| 290 | if (clusterService.getLocalNode().id().equals(nodeId)) { |
| 291 | return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName), duration, unit)); |
| 292 | } else { |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 293 | return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, resourceName), |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 294 | CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId); |
| 295 | } |
| 296 | } |
| 297 | |
| 298 | @Override |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 299 | public CompletableFuture<Set<String>> availableResources(NodeId nodeId, |
| 300 | Type resourceType) { |
| 301 | if (clusterService.getLocalNode().id().equals(nodeId)) { |
| 302 | Set<String> resources = getLocalAvailableResources(resourceType); |
| 303 | return CompletableFuture.completedFuture(resources); |
| 304 | } else { |
| 305 | return communicationService.sendAndReceive(createResourceRequest(resourceType), |
| 306 | CONTROL_RESOURCE, SERIALIZER::encode, SERIALIZER::decode, nodeId); |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 307 | } |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 308 | } |
| 309 | |
| 310 | /** |
| 311 | * Builds and returns metric database instance with given resource name, |
| 312 | * resource type and metric type. |
| 313 | * |
| 314 | * @param resourceName resource name |
| 315 | * @param resourceType resource type |
| 316 | * @param metricTypes metric type |
| 317 | * @return metric database instance |
| 318 | */ |
| 319 | private MetricsDatabase genMDbBuilder(String resourceName, |
| 320 | Type resourceType, |
| 321 | Set<ControlMetricType> metricTypes) { |
| 322 | MetricsDatabase.Builder builder = new DefaultMetricsDatabase.Builder(); |
| 323 | builder.withMetricName(resourceType.toString()); |
| 324 | builder.withResourceName(resourceName); |
| 325 | metricTypes.forEach(type -> builder.addMetricType(type.toString())); |
| 326 | return builder.build(); |
| 327 | } |
| 328 | |
| 329 | /** |
| 330 | * Updates network metrics with given metric map and resource name. |
| 331 | * |
| 332 | * @param metricMap a metric map which is comprised of metric type and value |
| 333 | * @param resourceName resource name |
| 334 | */ |
| 335 | private void updateNetworkMetrics(Map<ControlMetricType, Double> metricMap, |
| 336 | String resourceName) { |
Jian Li | 1aa0782 | 2016-04-19 17:58:02 -0700 | [diff] [blame] | 337 | if (!networkMetricsMap.containsKey(resourceName)) { |
| 338 | networkMetricsMap.put(resourceName, genMDbBuilder(resourceName, |
| 339 | Type.NETWORK, NETWORK_METRICS)); |
| 340 | } |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 341 | networkMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap)); |
| 342 | } |
| 343 | |
| 344 | /** |
| 345 | * Updates disk metrics with given metric map and resource name. |
| 346 | * |
| 347 | * @param metricMap a metric map which is comprised of metric type and value |
| 348 | * @param resourceName resource name |
| 349 | */ |
| 350 | private void updateDiskMetrics(Map<ControlMetricType, Double> metricMap, |
| 351 | String resourceName) { |
Jian Li | 1aa0782 | 2016-04-19 17:58:02 -0700 | [diff] [blame] | 352 | if (!diskMetricsMap.containsKey(resourceName)) { |
| 353 | diskMetricsMap.put(resourceName, genMDbBuilder(resourceName, |
| 354 | Type.DISK, DISK_METRICS)); |
| 355 | } |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 356 | diskMetricsMap.get(resourceName).updateMetrics(convertMap(metricMap)); |
| 357 | } |
| 358 | |
| 359 | /** |
| 360 | * Updates control message metrics with given metric map and device identifier. |
| 361 | * |
| 362 | * @param metricMap a metric map which is comprised of metric type and value |
| 363 | * @param deviceId device identifier |
| 364 | */ |
| 365 | private void updateControlMessages(Map<ControlMetricType, Double> metricMap, |
| 366 | DeviceId deviceId) { |
Jian Li | 1aa0782 | 2016-04-19 17:58:02 -0700 | [diff] [blame] | 367 | if (!controlMessageMap.containsKey(deviceId)) { |
| 368 | controlMessageMap.put(deviceId, genMDbBuilder(deviceId.toString(), |
| 369 | Type.CONTROL_MESSAGE, CONTROL_MESSAGE_METRICS)); |
| 370 | } |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 371 | controlMessageMap.get(deviceId).updateMetrics(convertMap(metricMap)); |
| 372 | } |
| 373 | |
| 374 | /** |
| 375 | * Converts metric map into a new map which contains string formatted metric type as key. |
| 376 | * |
| 377 | * @param metricMap metric map in which ControlMetricType is key |
| 378 | * @return a new map in which string formatted metric type is key |
| 379 | */ |
| 380 | private Map<String, Double> convertMap(Map<ControlMetricType, Double> metricMap) { |
| 381 | if (metricMap == null) { |
| 382 | return ImmutableMap.of(); |
| 383 | } |
| 384 | Map newMap = Maps.newConcurrentMap(); |
| 385 | metricMap.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v)); |
| 386 | return newMap; |
| 387 | } |
| 388 | |
| 389 | /** |
| 390 | * Handles control metric request from remote node. |
| 391 | * |
| 392 | * @param request control metric request |
| 393 | * @return completable future object of control load snapshot |
| 394 | */ |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 395 | private CompletableFuture<ControlLoadSnapshot> |
| 396 | handleMetricsRequest(ControlMetricsRequest request) { |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 397 | |
| 398 | checkArgument(request.getType() != null, METRIC_TYPE_NULL); |
| 399 | |
| 400 | ControlLoad load; |
| 401 | if (request.getResourceName() != null && request.getUnit() != null) { |
| 402 | load = getLocalLoad(request.getType(), request.getResourceName()); |
| 403 | } else { |
| 404 | load = getLocalLoad(request.getType(), request.getDeviceId()); |
| 405 | } |
| 406 | |
| 407 | long average; |
| 408 | if (request.getUnit() != null) { |
| 409 | average = load.average(request.getDuration(), request.getUnit()); |
| 410 | } else { |
| 411 | average = load.average(); |
| 412 | } |
| 413 | ControlLoadSnapshot resp = |
| 414 | new ControlLoadSnapshot(load.latest(), average, load.time()); |
| 415 | return CompletableFuture.completedFuture(resp); |
| 416 | } |
| 417 | |
| 418 | /** |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 419 | * Handles control resource request from remote node. |
| 420 | * |
| 421 | * @param request control resource type |
| 422 | * @return completable future object of control resource set |
| 423 | */ |
| 424 | private CompletableFuture<Set<String>> |
| 425 | handleResourceRequest(ControlResourceRequest request) { |
| 426 | |
| 427 | checkArgument(request.getType() != null, RESOURCE_TYPE_NULL); |
| 428 | |
| 429 | Set<String> resources = getLocalAvailableResources(request.getType()); |
| 430 | return CompletableFuture.completedFuture(resources); |
| 431 | } |
| 432 | |
| 433 | /** |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 434 | * Generates a control metric request. |
| 435 | * |
| 436 | * @param type control metric type |
| 437 | * @param deviceId device identifier |
| 438 | * @return control metric request instance |
| 439 | */ |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 440 | private ControlMetricsRequest createMetricsRequest(ControlMetricType type, |
| 441 | Optional<DeviceId> deviceId) { |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 442 | return new ControlMetricsRequest(type, deviceId); |
| 443 | } |
| 444 | |
| 445 | /** |
| 446 | * Generates a control metric request with given projected time range. |
| 447 | * |
| 448 | * @param type control metric type |
| 449 | * @param duration projected time duration |
| 450 | * @param unit projected time unit |
| 451 | * @param deviceId device identifier |
| 452 | * @return control metric request instance |
| 453 | */ |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 454 | private ControlMetricsRequest createMetricsRequest(ControlMetricType type, |
| 455 | int duration, TimeUnit unit, |
| 456 | Optional<DeviceId> deviceId) { |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 457 | return new ControlMetricsRequest(type, duration, unit, deviceId); |
| 458 | } |
| 459 | |
| 460 | /** |
| 461 | * Generates a control metric request. |
| 462 | * |
| 463 | * @param type control metric type |
| 464 | * @param resourceName resource name |
| 465 | * @return control metric request instance |
| 466 | */ |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 467 | private ControlMetricsRequest createMetricsRequest(ControlMetricType type, |
| 468 | String resourceName) { |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 469 | return new ControlMetricsRequest(type, resourceName); |
| 470 | } |
| 471 | |
| 472 | /** |
| 473 | * Generates a control metric request with given projected time range. |
| 474 | * |
| 475 | * @param type control metric type |
| 476 | * @param duration projected time duration |
| 477 | * @param unit projected time unit |
| 478 | * @param resourceName resource name |
| 479 | * @return control metric request instance |
| 480 | */ |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 481 | private ControlMetricsRequest createMetricsRequest(ControlMetricType type, |
| 482 | int duration, TimeUnit unit, |
| 483 | String resourceName) { |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 484 | return new ControlMetricsRequest(type, duration, unit, resourceName); |
| 485 | } |
| 486 | |
| 487 | /** |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 488 | * Generates a control resource request with given resource type. |
| 489 | * |
| 490 | * @param type control resource type |
| 491 | * @return control resource request instance |
| 492 | */ |
| 493 | private ControlResourceRequest createResourceRequest(ControlResource.Type type) { |
| 494 | return new ControlResourceRequest(type); |
| 495 | } |
| 496 | |
| 497 | /** |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 498 | * Returns a snapshot of control load. |
| 499 | * |
| 500 | * @param cl control load |
| 501 | * @return a snapshot of control load |
| 502 | */ |
| 503 | private ControlLoadSnapshot snapshot(ControlLoad cl) { |
| 504 | if (cl != null) { |
| 505 | return new ControlLoadSnapshot(cl.latest(), cl.average(), cl.time()); |
| 506 | } |
| 507 | return null; |
| 508 | } |
| 509 | |
| 510 | /** |
| 511 | * Returns a snapshot of control load with given projected time range. |
| 512 | * |
| 513 | * @param cl control load |
| 514 | * @param duration projected time duration |
| 515 | * @param unit projected time unit |
| 516 | * @return a snapshot of control load |
| 517 | */ |
| 518 | private ControlLoadSnapshot snapshot(ControlLoad cl, int duration, TimeUnit unit) { |
| 519 | if (cl != null) { |
Jian Li | 1aa0782 | 2016-04-19 17:58:02 -0700 | [diff] [blame] | 520 | |
| 521 | return new ControlLoadSnapshot(cl.latest(), cl.average(duration, unit), |
| 522 | cl.time(), cl.recent(duration, unit)); |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 523 | } |
| 524 | return null; |
| 525 | } |
| 526 | |
| 527 | /** |
| 528 | * Returns local control load. |
| 529 | * |
| 530 | * @param type metric type |
| 531 | * @param deviceId device identifier |
| 532 | * @return control load |
| 533 | */ |
| 534 | private ControlLoad getLocalLoad(ControlMetricType type, |
| 535 | Optional<DeviceId> deviceId) { |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 536 | if (deviceId.isPresent()) { |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 537 | // returns control message stats |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 538 | if (CONTROL_MESSAGE_METRICS.contains(type) && |
| 539 | availableDeviceIdSet.contains(deviceId.get())) { |
| 540 | return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type); |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 541 | } |
| 542 | } else { |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 543 | // returns controlLoad of CPU metrics |
| 544 | if (CPU_METRICS.contains(type)) { |
| 545 | return new DefaultControlLoad(cpuMetrics, type); |
| 546 | } |
| 547 | |
| 548 | // returns memoryLoad of memory metrics |
| 549 | if (MEMORY_METRICS.contains(type)) { |
| 550 | return new DefaultControlLoad(memoryMetrics, type); |
| 551 | } |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 552 | } |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 553 | return null; |
| 554 | } |
| 555 | |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 556 | /** |
| 557 | * Returns local control load. |
| 558 | * |
| 559 | * @param type metric type |
| 560 | * @param resourceName resource name |
| 561 | * @return control load |
| 562 | */ |
| 563 | private ControlLoad getLocalLoad(ControlMetricType type, String resourceName) { |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 564 | NodeId localNodeId = clusterService.getLocalNode().id(); |
| 565 | |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 566 | // returns disk I/O stats |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 567 | if (DISK_METRICS.contains(type) && |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 568 | availableResourcesSync(localNodeId, Type.DISK).contains(resourceName)) { |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 569 | return new DefaultControlLoad(diskMetricsMap.get(resourceName), type); |
| 570 | } |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 571 | |
Jian Li | 67e1e15 | 2016-04-18 17:52:58 -0700 | [diff] [blame] | 572 | // returns network I/O stats |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 573 | if (NETWORK_METRICS.contains(type) && |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 574 | availableResourcesSync(localNodeId, Type.NETWORK).contains(resourceName)) { |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 575 | return new DefaultControlLoad(networkMetricsMap.get(resourceName), type); |
Jian Li | 7d180c5 | 2016-02-01 21:53:08 -0800 | [diff] [blame] | 576 | } |
Jian Li | 6080432 | 2015-12-02 14:46:31 -0800 | [diff] [blame] | 577 | return null; |
| 578 | } |
Jian Li | 89eeccd | 2016-05-06 02:10:33 -0700 | [diff] [blame] | 579 | |
| 580 | /** |
| 581 | * Obtains the available resource list from local node. |
| 582 | * |
| 583 | * @param resourceType control resource type |
| 584 | * @return a set of available control resource |
| 585 | */ |
| 586 | private Set<String> getLocalAvailableResources(Type resourceType) { |
| 587 | Set<String> resources = ImmutableSet.of(); |
| 588 | if (RESOURCE_TYPE_SET.contains(resourceType)) { |
| 589 | if (Type.CONTROL_MESSAGE.equals(resourceType)) { |
| 590 | resources = ImmutableSet.copyOf(availableDeviceIdSet.stream() |
| 591 | .map(DeviceId::toString).collect(Collectors.toSet())); |
| 592 | } else { |
| 593 | Set<String> res = availableResourceMap.get(resourceType); |
| 594 | resources = res == null ? ImmutableSet.of() : res; |
| 595 | } |
| 596 | } |
| 597 | return resources; |
| 598 | } |
Jian Li | 23906cc | 2016-03-31 11:16:44 -0700 | [diff] [blame] | 599 | } |