blob: 14269468612583dbb52e29287e68a34d83794722 [file] [log] [blame]
Carmelo Cascone3977ea42019-02-28 13:43:42 -08001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.collect.Maps;
20import com.google.common.util.concurrent.Striped;
21import org.onosproject.mastership.MastershipEvent;
22import org.onosproject.mastership.MastershipListener;
23import org.onosproject.mastership.MastershipService;
24import org.onosproject.net.Device;
25import org.onosproject.net.DeviceId;
26import org.onosproject.net.device.DeviceEvent;
27import org.onosproject.net.device.DeviceListener;
28import org.onosproject.net.device.DeviceProviderService;
29import org.onosproject.net.device.DeviceService;
30import org.onosproject.net.device.PortStatistics;
31import org.onosproject.net.device.PortStatisticsDiscovery;
32import org.slf4j.Logger;
33
34import java.security.SecureRandom;
35import java.util.Collection;
36import java.util.Map;
37import java.util.Objects;
38import java.util.concurrent.ScheduledExecutorService;
39import java.util.concurrent.ScheduledFuture;
40import java.util.concurrent.TimeUnit;
41import java.util.concurrent.locks.Lock;
42
43import static com.google.common.base.Preconditions.checkArgument;
44import static java.util.concurrent.Executors.newScheduledThreadPool;
45import static org.onlab.util.Tools.groupedThreads;
46import static org.onosproject.provider.general.device.impl.GeneralDeviceProvider.myScheme;
47import static org.slf4j.LoggerFactory.getLogger;
48
49/**
50 * Component devoted to polling stats from devices managed by the
51 * GeneralDeviceProvider.
52 */
53public class StatsPoller {
54
55 private static final int CORE_POOL_SIZE = 5;
56
57 private final Logger log = getLogger(getClass());
58
59 private final DeviceService deviceService;
60 private final MastershipService mastershipService;
61 private final DeviceProviderService providerService;
62
63 private final InternalDeviceListener deviceListener = new InternalDeviceListener();
64 private final MastershipListener mastershipListener = new InternalMastershipListener();
65 private final Striped<Lock> deviceLocks = Striped.lock(30);
66
67 private ScheduledExecutorService statsExecutor;
68 private Map<DeviceId, ScheduledFuture<?>> statsPollingTasks;
69 private Map<DeviceId, Integer> pollFrequencies;
70 private int statsPollInterval;
71
72 StatsPoller(DeviceService deviceService, MastershipService mastershipService,
73 DeviceProviderService providerService) {
74 this.deviceService = deviceService;
75 this.mastershipService = mastershipService;
76 this.providerService = providerService;
77 }
78
79
80 void activate(int statsPollInterval) {
81 checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
82 this.statsPollInterval = statsPollInterval;
83 statsExecutor = newScheduledThreadPool(CORE_POOL_SIZE, groupedThreads(
84 "onos/gdp-stats", "%d", log));
85 statsPollingTasks = Maps.newHashMap();
86 pollFrequencies = Maps.newHashMap();
87 deviceService.getDevices().forEach(d -> updatePollingTask(d.id()));
88 deviceService.addListener(deviceListener);
89 mastershipService.addListener(mastershipListener);
90 log.info("Started");
91 }
92
93 void reschedule(int statsPollInterval) {
94 checkArgument(statsPollInterval > 0, "statsPollInterval must be greater than 0");
95 this.statsPollInterval = statsPollInterval;
96 statsPollingTasks.keySet().forEach(this::updatePollingTask);
97 }
98
99 void deactivate() {
100 deviceService.removeListener(deviceListener);
101 mastershipService.removeListener(mastershipListener);
102
103 statsPollingTasks.values().forEach(t -> t.cancel(false));
104 statsPollingTasks.clear();
105 pollFrequencies.clear();
106 statsPollingTasks = null;
107 pollFrequencies = null;
108
109 statsExecutor.shutdownNow();
110 try {
111 statsExecutor.awaitTermination(5, TimeUnit.SECONDS);
112 } catch (InterruptedException e) {
113 log.warn("statsExecutor not terminated properly");
114 }
115 statsExecutor = null;
116
117 log.info("Stopped");
118 }
119
120
121 private void updatePollingTask(DeviceId deviceId) {
122 deviceLocks.get(deviceId).lock();
123 try {
124 final ScheduledFuture<?> existingTask = statsPollingTasks.get(deviceId);
125 final boolean shouldHaveTask = deviceService.getDevice(deviceId) != null
126 && deviceService.isAvailable(deviceId)
127 && mastershipService.isLocalMaster(deviceId)
128 && deviceService.getDevice(deviceId).is(PortStatisticsDiscovery.class);
129 final boolean pollFrequencyChanged = !Objects.equals(
130 pollFrequencies.get(deviceId), statsPollInterval);
131
132 if (existingTask != null && (!shouldHaveTask || pollFrequencyChanged)) {
133 existingTask.cancel(false);
134 statsPollingTasks.remove(deviceId);
135 pollFrequencies.remove(deviceId);
136 log.info("Cancelled polling task for {}", deviceId);
137 }
138
139 if (shouldHaveTask) {
140 final int delay = new SecureRandom().nextInt(statsPollInterval);
141 statsPollingTasks.put(deviceId, statsExecutor.scheduleAtFixedRate(
142 exceptionSafe(() -> updatePortStatistics(deviceId)),
143 delay, statsPollInterval, TimeUnit.SECONDS));
144 pollFrequencies.put(deviceId, statsPollInterval);
145 log.info("Started polling task for {} with interval {} seconds",
146 deviceId, statsPollInterval);
147 }
148 } finally {
149 deviceLocks.get(deviceId).unlock();
150 }
151 }
152
153 private void updatePortStatistics(DeviceId deviceId) {
154 final Device device = deviceService.getDevice(deviceId);
155 final Collection<PortStatistics> statistics = device.as(
156 PortStatisticsDiscovery.class).discoverPortStatistics();
157 if (!statistics.isEmpty()) {
158 providerService.updatePortStatistics(deviceId, statistics);
159 }
160 }
161
162 private Runnable exceptionSafe(Runnable runnable) {
163 return () -> {
164 try {
165 runnable.run();
166 } catch (Exception e) {
167 log.error("Unhandled exception in stats poller", e);
168 }
169 };
170 }
171
172 private class InternalMastershipListener implements MastershipListener {
173
174 @Override
175 public void event(MastershipEvent event) {
176 updatePollingTask(event.subject());
177 }
178
179 @Override
180 public boolean isRelevant(MastershipEvent event) {
181 return event.type() == MastershipEvent.Type.MASTER_CHANGED
182 && myScheme(event.subject());
183 }
184 }
185
186 /**
187 * Listener for core device events.
188 */
189 private class InternalDeviceListener implements DeviceListener {
190 @Override
191 public void event(DeviceEvent event) {
192 updatePollingTask(event.subject().id());
193 }
194
195 @Override
196 public boolean isRelevant(DeviceEvent event) {
197 switch (event.type()) {
198 case DEVICE_ADDED:
199 case DEVICE_UPDATED:
200 case DEVICE_AVAILABILITY_CHANGED:
201 case DEVICE_REMOVED:
202 case DEVICE_SUSPENDED:
203 return myScheme(event.subject().id());
204 default:
205 return false;
206 }
207 }
208 }
209}