blob: 7f1ee403bdc50cc93a3c417ecf948f3f877dd5b2 [file] [log] [blame]
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net.faultmanagement.alarm.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmConsumer;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProvider;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderRegistry;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderService;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.Dictionary;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Alarm provider capable of polling the environment using the device driver
* {@link AlarmConsumer} behaviour.
*/
@Component(immediate = true)
public class PollingAlarmProvider extends AbstractProvider implements AlarmProvider {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected AlarmProviderRegistry providerRegistry;
protected AlarmProviderService providerService;
protected ScheduledExecutorService alarmsExecutor;
private ScheduledFuture<?> scheduledTask;
private ExecutorService eventHandlingExecutor;
protected final MastershipListener mastershipListener = new InternalMastershipListener();
protected final DeviceListener deviceListener = new InternalDeviceListener();
private static final int CORE_POOL_SIZE = 10;
private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 60;
@Property(name = "alarmPollFrequencySeconds", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
label = "Frequency (in seconds) for polling alarm from devices")
protected int alarmPollFrequencySeconds = DEFAULT_POLL_FREQUENCY_SECONDS;
// TODO implement purging of old alarms.
private static final int DEFAULT_CLEAR_FREQUENCY_SECONDS = 500;
@Property(name = "clearedAlarmPurgeSeconds", intValue = DEFAULT_CLEAR_FREQUENCY_SECONDS,
label = "Frequency (in seconds) for deleting cleared alarms")
private int clearedAlarmPurgeFrequencySeconds = DEFAULT_CLEAR_FREQUENCY_SECONDS;
public PollingAlarmProvider() {
super(new ProviderId("default", "org.onosproject.core"));
}
@Activate
public void activate(ComponentContext context) {
alarmsExecutor = Executors.newScheduledThreadPool(CORE_POOL_SIZE);
eventHandlingExecutor =
Executors.newFixedThreadPool(CORE_POOL_SIZE,
groupedThreads("onos/pollingalarmprovider",
"device-installer-%d", log));
providerService = providerRegistry.register(this);
deviceService.addListener(deviceListener);
mastershipService.addListener(mastershipListener);
if (context == null) {
alarmPollFrequencySeconds = DEFAULT_POLL_FREQUENCY_SECONDS;
log.info("No component configuration");
} else {
Dictionary<?, ?> properties = context.getProperties();
alarmPollFrequencySeconds = getNewPollFrequency(properties, alarmPollFrequencySeconds);
}
scheduledTask = schedulePolling();
log.info("Started");
}
@Deactivate
public void deactivate() {
providerRegistry.unregister(this);
mastershipService.removeListener(mastershipListener);
deviceService.removeListener(deviceListener);
alarmsExecutor.shutdown();
providerService = null;
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
if (context == null) {
log.info("No component configuration");
return;
}
Dictionary<?, ?> properties = context.getProperties();
int newPollFrequency = getNewPollFrequency(properties, alarmPollFrequencySeconds);
if (newPollFrequency != alarmPollFrequencySeconds) {
alarmPollFrequencySeconds = newPollFrequency;
//stops the old scheduled task
scheduledTask.cancel(true);
//schedules new task at the new polling rate
scheduledTask = schedulePolling();
}
}
private ScheduledFuture schedulePolling() {
return alarmsExecutor.scheduleAtFixedRate(this::consumeAlarms,
alarmPollFrequencySeconds / 4, alarmPollFrequencySeconds,
TimeUnit.SECONDS);
}
private int getNewPollFrequency(Dictionary<?, ?> properties, int pollFrequency) {
int newPollFrequency;
try {
String s = get(properties, "pollFrequency");
newPollFrequency = isNullOrEmpty(s) ? pollFrequency : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
newPollFrequency = DEFAULT_POLL_FREQUENCY_SECONDS;
}
return newPollFrequency;
}
@Override
public void triggerProbe(DeviceId deviceId) {
if (mastershipService.isLocalMaster(deviceId)) {
triggerProbe(deviceService.getDevice(deviceId));
}
}
private void triggerProbe(Device device) {
alarmsExecutor.submit(() -> consumeAlarms(device));
}
private void consumeAlarms() {
deviceService.getAvailableDevices().forEach(device -> {
if (mastershipService.isLocalMaster(device.id())) {
consumeAlarms(device);
}
});
}
private void consumeAlarms(Device device) {
if (device.is(AlarmConsumer.class)) {
providerService.updateAlarmList(device.id(),
device.as(AlarmConsumer.class).consumeAlarms());
} else {
log.info("Device {} does not support alarm consumer behaviour", device.id());
}
}
private class InternalMastershipListener implements MastershipListener {
@Override
public boolean isRelevant(MastershipEvent event) {
return mastershipService.isLocalMaster(event.subject());
}
@Override
public void event(MastershipEvent event) {
triggerProbe(event.subject());
}
}
/**
* Internal listener for device service events.
*/
private class InternalDeviceListener implements DeviceListener {
@Override
public boolean isRelevant(DeviceEvent event) {
return event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
&& deviceService.isAvailable(event.subject().id());
}
@Override
public void event(DeviceEvent event) {
log.debug("InternalDeviceListener has got event from device-service{} with ", event);
eventHandlingExecutor.execute(() -> triggerProbe(event.subject().id()));
}
}
}