blob: 7f1ee403bdc50cc93a3c417ecf948f3f877dd5b2 [file] [log] [blame]
Andrea Campanellae72ac552016-04-11 10:04:52 -07001/*
2 * Copyright 2016 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.incubator.net.faultmanagement.alarm.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Modified;
22import org.apache.felix.scr.annotations.Property;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.onosproject.incubator.net.faultmanagement.alarm.AlarmConsumer;
26import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProvider;
27import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderRegistry;
28import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderService;
29import org.onosproject.mastership.MastershipEvent;
30import org.onosproject.mastership.MastershipListener;
31import org.onosproject.mastership.MastershipService;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.device.DeviceEvent;
35import org.onosproject.net.device.DeviceListener;
36import org.onosproject.net.device.DeviceService;
37import org.onosproject.net.provider.AbstractProvider;
38import org.onosproject.net.provider.ProviderId;
39import org.osgi.service.component.ComponentContext;
40import org.slf4j.Logger;
41
42import java.util.Dictionary;
43import java.util.concurrent.ExecutorService;
44import java.util.concurrent.Executors;
45import java.util.concurrent.ScheduledExecutorService;
46import java.util.concurrent.ScheduledFuture;
47import java.util.concurrent.TimeUnit;
48
49import static com.google.common.base.Strings.isNullOrEmpty;
50import static org.onlab.util.Tools.get;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.slf4j.LoggerFactory.getLogger;
53
54/**
55 * Alarm provider capable of polling the environment using the device driver
56 * {@link AlarmConsumer} behaviour.
57 */
58@Component(immediate = true)
59public class PollingAlarmProvider extends AbstractProvider implements AlarmProvider {
60
61 private final Logger log = getLogger(getClass());
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 protected DeviceService deviceService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected MastershipService mastershipService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected AlarmProviderRegistry providerRegistry;
71
72 protected AlarmProviderService providerService;
73
74 protected ScheduledExecutorService alarmsExecutor;
75
76 private ScheduledFuture<?> scheduledTask;
77
78 private ExecutorService eventHandlingExecutor;
79
80 protected final MastershipListener mastershipListener = new InternalMastershipListener();
81
82 protected final DeviceListener deviceListener = new InternalDeviceListener();
83
84 private static final int CORE_POOL_SIZE = 10;
85
86 private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 60;
87 @Property(name = "alarmPollFrequencySeconds", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
88 label = "Frequency (in seconds) for polling alarm from devices")
89 protected int alarmPollFrequencySeconds = DEFAULT_POLL_FREQUENCY_SECONDS;
90
91 // TODO implement purging of old alarms.
92 private static final int DEFAULT_CLEAR_FREQUENCY_SECONDS = 500;
93 @Property(name = "clearedAlarmPurgeSeconds", intValue = DEFAULT_CLEAR_FREQUENCY_SECONDS,
94 label = "Frequency (in seconds) for deleting cleared alarms")
95 private int clearedAlarmPurgeFrequencySeconds = DEFAULT_CLEAR_FREQUENCY_SECONDS;
96
97 public PollingAlarmProvider() {
98 super(new ProviderId("default", "org.onosproject.core"));
99 }
100
101 @Activate
102 public void activate(ComponentContext context) {
103 alarmsExecutor = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
104 eventHandlingExecutor =
105 Executors.newFixedThreadPool(CORE_POOL_SIZE,
106 groupedThreads("onos/pollingalarmprovider",
107 "device-installer-%d", log));
108
109 providerService = providerRegistry.register(this);
110
111 deviceService.addListener(deviceListener);
112 mastershipService.addListener(mastershipListener);
113
114 if (context == null) {
115 alarmPollFrequencySeconds = DEFAULT_POLL_FREQUENCY_SECONDS;
116 log.info("No component configuration");
117 } else {
118 Dictionary<?, ?> properties = context.getProperties();
119 alarmPollFrequencySeconds = getNewPollFrequency(properties, alarmPollFrequencySeconds);
120 }
121 scheduledTask = schedulePolling();
122 log.info("Started");
123 }
124
125 @Deactivate
126 public void deactivate() {
127 providerRegistry.unregister(this);
128 mastershipService.removeListener(mastershipListener);
129 deviceService.removeListener(deviceListener);
130 alarmsExecutor.shutdown();
131 providerService = null;
132 log.info("Stopped");
133 }
134
135 @Modified
136 public void modified(ComponentContext context) {
137 if (context == null) {
138 log.info("No component configuration");
139 return;
140 }
141
142 Dictionary<?, ?> properties = context.getProperties();
143 int newPollFrequency = getNewPollFrequency(properties, alarmPollFrequencySeconds);
144 if (newPollFrequency != alarmPollFrequencySeconds) {
145 alarmPollFrequencySeconds = newPollFrequency;
146 //stops the old scheduled task
147 scheduledTask.cancel(true);
148 //schedules new task at the new polling rate
149 scheduledTask = schedulePolling();
150 }
151 }
152
153 private ScheduledFuture schedulePolling() {
154 return alarmsExecutor.scheduleAtFixedRate(this::consumeAlarms,
155 alarmPollFrequencySeconds / 4, alarmPollFrequencySeconds,
156 TimeUnit.SECONDS);
157 }
158
159 private int getNewPollFrequency(Dictionary<?, ?> properties, int pollFrequency) {
160 int newPollFrequency;
161 try {
162 String s = get(properties, "pollFrequency");
163 newPollFrequency = isNullOrEmpty(s) ? pollFrequency : Integer.parseInt(s.trim());
164 } catch (NumberFormatException | ClassCastException e) {
165 newPollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
166 }
167 return newPollFrequency;
168 }
169
170 @Override
171 public void triggerProbe(DeviceId deviceId) {
172 if (mastershipService.isLocalMaster(deviceId)) {
173 triggerProbe(deviceService.getDevice(deviceId));
174 }
175 }
176
177 private void triggerProbe(Device device) {
178 alarmsExecutor.submit(() -> consumeAlarms(device));
179 }
180
181 private void consumeAlarms() {
182 deviceService.getAvailableDevices().forEach(device -> {
183 if (mastershipService.isLocalMaster(device.id())) {
184 consumeAlarms(device);
185 }
186 });
187 }
188
189 private void consumeAlarms(Device device) {
190 if (device.is(AlarmConsumer.class)) {
191 providerService.updateAlarmList(device.id(),
192 device.as(AlarmConsumer.class).consumeAlarms());
193 } else {
194 log.info("Device {} does not support alarm consumer behaviour", device.id());
195 }
196 }
197
198 private class InternalMastershipListener implements MastershipListener {
199
200 @Override
201 public boolean isRelevant(MastershipEvent event) {
202 return mastershipService.isLocalMaster(event.subject());
203 }
204
205 @Override
206 public void event(MastershipEvent event) {
207 triggerProbe(event.subject());
208 }
209 }
210
211 /**
212 * Internal listener for device service events.
213 */
214 private class InternalDeviceListener implements DeviceListener {
215
216 @Override
217 public boolean isRelevant(DeviceEvent event) {
218 return event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
219 && deviceService.isAvailable(event.subject().id());
220 }
221
222 @Override
223 public void event(DeviceEvent event) {
224 log.debug("InternalDeviceListener has got event from device-service{} with ", event);
225 eventHandlingExecutor.execute(() -> triggerProbe(event.subject().id()));
226 }
227
228 }
229}