blob: 70f60d1c26680c2c3a845fac5b84128410df553e [file] [log] [blame]
Andrea Campanellae72ac552016-04-11 10:04:52 -07001/*
Andrea Campanella637c8072016-04-12 17:29:17 -07002 * Copyright 2016-present Open Networking Laboratory
Andrea Campanellae72ac552016-04-11 10:04:52 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Andrea Campanella637c8072016-04-12 17:29:17 -070016package org.onosproject.faultmanagement.impl;
Andrea Campanellae72ac552016-04-11 10:04:52 -070017
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Modified;
22import org.apache.felix.scr.annotations.Property;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.onosproject.incubator.net.faultmanagement.alarm.AlarmConsumer;
26import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProvider;
27import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderRegistry;
28import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderService;
29import org.onosproject.mastership.MastershipEvent;
30import org.onosproject.mastership.MastershipListener;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.device.DeviceEvent;
35import org.onosproject.net.device.DeviceListener;
36import org.onosproject.net.device.DeviceService;
37import org.onosproject.net.provider.AbstractProvider;
38import org.onosproject.net.provider.ProviderId;
39import org.osgi.service.component.ComponentContext;
40import org.slf4j.Logger;
41
42import java.util.Dictionary;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.Executors;
45import java.util.concurrent.ScheduledExecutorService;
46import java.util.concurrent.ScheduledFuture;
47import java.util.concurrent.TimeUnit;
48
49import static com.google.common.base.Strings.isNullOrEmpty;
Yuta HIGUCHI1624df12016-07-21 16:54:33 -070050import static java.util.concurrent.Executors.newScheduledThreadPool;
Andrea Campanellae72ac552016-04-11 10:04:52 -070051import static org.onlab.util.Tools.get;
52import static org.onlab.util.Tools.groupedThreads;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Alarm provider capable of polling the environment using the device driver
57 * {@link AlarmConsumer} behaviour.
58 */
59@Component(immediate = true)
60public class PollingAlarmProvider extends AbstractProvider implements AlarmProvider {
61
62 private final Logger log = getLogger(getClass());
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 protected DeviceService deviceService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected MastershipService mastershipService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected AlarmProviderRegistry providerRegistry;
72
73 protected AlarmProviderService providerService;
74
75 protected ScheduledExecutorService alarmsExecutor;
76
77 private ScheduledFuture<?> scheduledTask;
78
79 private ExecutorService eventHandlingExecutor;
80
81 protected final MastershipListener mastershipListener = new InternalMastershipListener();
82
83 protected final DeviceListener deviceListener = new InternalDeviceListener();
84
85 private static final int CORE_POOL_SIZE = 10;
86
87 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 60;
88 @Property(name = "alarmPollFrequencySeconds", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
89 label = "Frequency (in seconds) for polling alarm from devices")
90 protected int alarmPollFrequencySeconds = DEFAULT_POLL_FREQUENCY_SECONDS;
91
92 // TODO implement purging of old alarms.
93 private static final int DEFAULT_CLEAR_FREQUENCY_SECONDS = 500;
94 @Property(name = "clearedAlarmPurgeSeconds", intValue = DEFAULT_CLEAR_FREQUENCY_SECONDS,
95 label = "Frequency (in seconds) for deleting cleared alarms")
96 private int clearedAlarmPurgeFrequencySeconds = DEFAULT_CLEAR_FREQUENCY_SECONDS;
97
98 public PollingAlarmProvider() {
99 super(new ProviderId("default", "org.onosproject.core"));
100 }
101
102 @Activate
103 public void activate(ComponentContext context) {
Yuta HIGUCHI1624df12016-07-21 16:54:33 -0700104 alarmsExecutor = newScheduledThreadPool(CORE_POOL_SIZE,
105 groupedThreads("onos/pollingalarmprovider",
106 "alarm-executor-%d", log));
Andrea Campanellae72ac552016-04-11 10:04:52 -0700107 eventHandlingExecutor =
108 Executors.newFixedThreadPool(CORE_POOL_SIZE,
109 groupedThreads("onos/pollingalarmprovider",
110 "device-installer-%d", log));
111
112 providerService = providerRegistry.register(this);
113
114 deviceService.addListener(deviceListener);
115 mastershipService.addListener(mastershipListener);
116
117 if (context == null) {
118 alarmPollFrequencySeconds = DEFAULT_POLL_FREQUENCY_SECONDS;
119 log.info("No component configuration");
120 } else {
121 Dictionary<?, ?> properties = context.getProperties();
122 alarmPollFrequencySeconds = getNewPollFrequency(properties, alarmPollFrequencySeconds);
123 }
124 scheduledTask = schedulePolling();
125 log.info("Started");
126 }
127
128 @Deactivate
129 public void deactivate() {
130 providerRegistry.unregister(this);
131 mastershipService.removeListener(mastershipListener);
132 deviceService.removeListener(deviceListener);
133 alarmsExecutor.shutdown();
134 providerService = null;
135 log.info("Stopped");
136 }
137
138 @Modified
139 public void modified(ComponentContext context) {
140 if (context == null) {
141 log.info("No component configuration");
142 return;
143 }
144
145 Dictionary<?, ?> properties = context.getProperties();
146 int newPollFrequency = getNewPollFrequency(properties, alarmPollFrequencySeconds);
147 if (newPollFrequency != alarmPollFrequencySeconds) {
148 alarmPollFrequencySeconds = newPollFrequency;
149 //stops the old scheduled task
150 scheduledTask.cancel(true);
151 //schedules new task at the new polling rate
152 scheduledTask = schedulePolling();
153 }
154 }
155
156 private ScheduledFuture schedulePolling() {
157 return alarmsExecutor.scheduleAtFixedRate(this::consumeAlarms,
158 alarmPollFrequencySeconds / 4, alarmPollFrequencySeconds,
159 TimeUnit.SECONDS);
160 }
161
162 private int getNewPollFrequency(Dictionary<?, ?> properties, int pollFrequency) {
163 int newPollFrequency;
164 try {
165 String s = get(properties, "pollFrequency");
166 newPollFrequency = isNullOrEmpty(s) ? pollFrequency : Integer.parseInt(s.trim());
167 } catch (NumberFormatException | ClassCastException e) {
168 newPollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
169 }
170 return newPollFrequency;
171 }
172
173 @Override
174 public void triggerProbe(DeviceId deviceId) {
175 if (mastershipService.isLocalMaster(deviceId)) {
176 triggerProbe(deviceService.getDevice(deviceId));
177 }
178 }
179
180 private void triggerProbe(Device device) {
181 alarmsExecutor.submit(() -> consumeAlarms(device));
182 }
183
184 private void consumeAlarms() {
185 deviceService.getAvailableDevices().forEach(device -> {
186 if (mastershipService.isLocalMaster(device.id())) {
187 consumeAlarms(device);
188 }
189 });
190 }
191
192 private void consumeAlarms(Device device) {
193 if (device.is(AlarmConsumer.class)) {
194 providerService.updateAlarmList(device.id(),
195 device.as(AlarmConsumer.class).consumeAlarms());
196 } else {
197 log.info("Device {} does not support alarm consumer behaviour", device.id());
198 }
199 }
200
201 private class InternalMastershipListener implements MastershipListener {
202
203 @Override
204 public boolean isRelevant(MastershipEvent event) {
205 return mastershipService.isLocalMaster(event.subject());
206 }
207
208 @Override
209 public void event(MastershipEvent event) {
210 triggerProbe(event.subject());
211 }
212 }
213
214 /**
215 * Internal listener for device service events.
216 */
217 private class InternalDeviceListener implements DeviceListener {
218
219 @Override
220 public boolean isRelevant(DeviceEvent event) {
221 return event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
222 && deviceService.isAvailable(event.subject().id());
223 }
224
225 @Override
226 public void event(DeviceEvent event) {
227 log.debug("InternalDeviceListener has got event from device-service{} with ", event);
228 eventHandlingExecutor.execute(() -> triggerProbe(event.subject().id()));
229 }
230
231 }
232}