blob: c0ac32d7a69f02e5c7dd27eff00d1a0fbcbc8461 [file] [log] [blame]
Carmelo Cascone3977ea42019-02-28 13:43:42 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.collect.Maps;
Carmelo Casconecb60f922019-03-08 10:50:15 -080020import com.google.common.collect.Streams;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080021import com.google.common.util.concurrent.Striped;
22import org.onosproject.mastership.MastershipEvent;
23import org.onosproject.mastership.MastershipListener;
24import org.onosproject.mastership.MastershipService;
25import org.onosproject.net.Device;
26import org.onosproject.net.DeviceId;
27import org.onosproject.net.device.DeviceEvent;
28import org.onosproject.net.device.DeviceListener;
29import org.onosproject.net.device.DeviceProviderService;
30import org.onosproject.net.device.DeviceService;
31import org.onosproject.net.device.PortStatistics;
32import org.onosproject.net.device.PortStatisticsDiscovery;
33import org.slf4j.Logger;
34
35import java.security.SecureRandom;
36import java.util.Collection;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080037import java.util.Objects;
Carmelo Casconecb60f922019-03-08 10:50:15 -080038import java.util.concurrent.ConcurrentMap;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080039import java.util.concurrent.ScheduledExecutorService;
40import java.util.concurrent.ScheduledFuture;
41import java.util.concurrent.TimeUnit;
42import java.util.concurrent.locks.Lock;
Carmelo Casconecb60f922019-03-08 10:50:15 -080043import java.util.stream.StreamSupport;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080044
45import static com.google.common.base.Preconditions.checkArgument;
46import static java.util.concurrent.Executors.newScheduledThreadPool;
47import static org.onlab.util.Tools.groupedThreads;
48import static org.onosproject.provider.general.device.impl.GeneralDeviceProvider.myScheme;
49import static org.slf4j.LoggerFactory.getLogger;
50
51/**
52 * Component devoted to polling stats from devices managed by the
53 * GeneralDeviceProvider.
54 */
55public class StatsPoller {
56
57 private static final int CORE_POOL_SIZE = 5;
58
59 private final Logger log = getLogger(getClass());
60
61 private final DeviceService deviceService;
62 private final MastershipService mastershipService;
63 private final DeviceProviderService providerService;
64
65 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
66 private final MastershipListener mastershipListener = new InternalMastershipListener();
67 private final Striped<Lock> deviceLocks = Striped.lock(30);
68
69 private ScheduledExecutorService statsExecutor;
Carmelo Casconecb60f922019-03-08 10:50:15 -080070 private ConcurrentMap<DeviceId, ScheduledFuture<?>> statsPollingTasks;
71 private ConcurrentMap<DeviceId, Integer> pollFrequencies;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080072 private int statsPollInterval;
73
74 StatsPoller(DeviceService deviceService, MastershipService mastershipService,
75 DeviceProviderService providerService) {
76 this.deviceService = deviceService;
77 this.mastershipService = mastershipService;
78 this.providerService = providerService;
79 }
80
81
82 void activate(int statsPollInterval) {
83 checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
Carmelo Cascone3977ea42019-02-28 13:43:42 -080084 statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
85 "onos/gdp-stats", "%d", log));
Carmelo Casconecb60f922019-03-08 10:50:15 -080086 statsPollingTasks = Maps.newConcurrentMap();
87 pollFrequencies = Maps.newConcurrentMap();
88 reschedule(statsPollInterval);
Carmelo Cascone3977ea42019-02-28 13:43:42 -080089 deviceService.addListener(deviceListener);
90 mastershipService.addListener(mastershipListener);
91 log.info("Started");
92 }
93
94 void reschedule(int statsPollInterval) {
95 checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
96 this.statsPollInterval = statsPollInterval;
Carmelo Casconecb60f922019-03-08 10:50:15 -080097 // Consider all devices in the store, plus those of existing tasks
98 // (which for some reason might disappear from the store and so we want
99 // to cancel).
100 Streams.concat(
101 StreamSupport.stream(deviceService.getDevices().spliterator(), false)
102 .map(Device::id),
103 statsPollingTasks.keySet().stream())
104 .distinct()
105 .forEach(this::updatePollingTask);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800106 }
107
108 void deactivate() {
109 deviceService.removeListener(deviceListener);
110 mastershipService.removeListener(mastershipListener);
111
112 statsPollingTasks.values().forEach(t -> t.cancel(false));
113 statsPollingTasks.clear();
114 pollFrequencies.clear();
115 statsPollingTasks = null;
116 pollFrequencies = null;
117
118 statsExecutor.shutdownNow();
119 try {
120 statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
121 } catch (InterruptedException e) {
122 log.warn("statsExecutor not terminated properly");
123 }
124 statsExecutor = null;
125
126 log.info("Stopped");
127 }
128
129
130 private void updatePollingTask(DeviceId deviceId) {
131 deviceLocks.get(deviceId).lock();
132 try {
133 final ScheduledFuture<?> existingTask = statsPollingTasks.get(deviceId);
Carmelo Casconecb60f922019-03-08 10:50:15 -0800134 final boolean shouldHaveTask = myScheme(deviceId)
135 && deviceService.getDevice(deviceId) != null
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800136 && deviceService.isAvailable(deviceId)
137 && mastershipService.isLocalMaster(deviceId)
138 && deviceService.getDevice(deviceId).is(PortStatisticsDiscovery.class);
Carmelo Casconecb60f922019-03-08 10:50:15 -0800139 final boolean pollIntervalChanged = !Objects.equals(
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800140 pollFrequencies.get(deviceId), statsPollInterval);
141
Carmelo Casconecb60f922019-03-08 10:50:15 -0800142 if (existingTask != null && (!shouldHaveTask || pollIntervalChanged)) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800143 existingTask.cancel(false);
144 statsPollingTasks.remove(deviceId);
145 pollFrequencies.remove(deviceId);
146 log.info("Cancelled polling task for {}", deviceId);
147 }
148
149 if (shouldHaveTask) {
Carmelo Casconecb60f922019-03-08 10:50:15 -0800150 if (statsPollingTasks.containsKey(deviceId)) {
151 // There's already a task, with the same interval.
152 return;
153 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800154 final int delay = new SecureRandom().nextInt(statsPollInterval);
155 statsPollingTasks.put(deviceId, statsExecutor.scheduleAtFixedRate(
156 exceptionSafe(() -> updatePortStatistics(deviceId)),
157 delay, statsPollInterval, TimeUnit.SECONDS));
158 pollFrequencies.put(deviceId, statsPollInterval);
159 log.info("Started polling task for {} with interval {} seconds",
160 deviceId, statsPollInterval);
161 }
162 } finally {
163 deviceLocks.get(deviceId).unlock();
164 }
165 }
166
167 private void updatePortStatistics(DeviceId deviceId) {
168 final Device device = deviceService.getDevice(deviceId);
Carmelo Casconecb60f922019-03-08 10:50:15 -0800169 if (!device.is(PortStatisticsDiscovery.class)) {
170 log.error("Missing PortStatisticsDiscovery behaviour for {}", deviceId);
171 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800172 final Collection<PortStatistics> statistics = device.as(
173 PortStatisticsDiscovery.class).discoverPortStatistics();
174 if (!statistics.isEmpty()) {
175 providerService.updatePortStatistics(deviceId, statistics);
176 }
177 }
178
179 private Runnable exceptionSafe(Runnable runnable) {
180 return () -> {
181 try {
182 runnable.run();
183 } catch (Exception e) {
184 log.error("Unhandled exception in stats poller", e);
185 }
186 };
187 }
188
189 private class InternalMastershipListener implements MastershipListener {
190
191 @Override
192 public void event(MastershipEvent event) {
193 updatePollingTask(event.subject());
194 }
195
196 @Override
197 public boolean isRelevant(MastershipEvent event) {
Carmelo Casconecb60f922019-03-08 10:50:15 -0800198 return event.type() == MastershipEvent.Type.MASTER_CHANGED;
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800199 }
200 }
201
202 /**
203 * Listener for core device events.
204 */
205 private class InternalDeviceListener implements DeviceListener {
206 @Override
207 public void event(DeviceEvent event) {
208 updatePollingTask(event.subject().id());
209 }
210
211 @Override
212 public boolean isRelevant(DeviceEvent event) {
213 switch (event.type()) {
214 case DEVICE_ADDED:
215 case DEVICE_UPDATED:
216 case DEVICE_AVAILABILITY_CHANGED:
217 case DEVICE_REMOVED:
218 case DEVICE_SUSPENDED:
Carmelo Casconecb60f922019-03-08 10:50:15 -0800219 return true;
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800220 default:
221 return false;
222 }
223 }
224 }
225}