[ONOS-3203] End-to-end demo of Fault Management via SNMP.
This adds SNMP device-discovery, and a Fault Management app which makes alarms available to users via REST/GUI/CLI interfaces.
There is still code cleanup that could be done, but aim of this commit is an end-to-end proof of concept.
To demonstrate :
1) /opt/onos/bin/onos-service
onos> app activate org.onosproject.snmp
onos> app activate org.onosproject.faultmanagement
2) SNMP devices are seeded via config file. The default seed file contains connection details for devices (SNMP agents) available via internet e.g. demo.snmplabs.com
cp /opt/onos/apache-karaf-3.0.3/etc/samples/org.onosproject.provider.snmp.device.impl.SnmpDeviceProvider.cfg /opt/onos/apache-karaf-3.0.3/etc/
3) ONOS will poll these SNMP devices and store their alarms.
4) You can now manipulate the alarms via REST e.g. http://<onos>:8181/onos/v1/fm/alarms , via CLI via various "alarm-*” commands or in UI with an Alarms Overlay.
More info at https://wiki.onosproject.org/display/ONOS/Fault+Management
15/Dec/15: Updated regarding review comments from Thomas Vachuska.
17/Dec/15: Updated coreService.registerApplication(name) as per https://gerrit.onosproject.org/#/c/6878/
Change-Id: I886f8511f178dc4600ab96e5ff10cc90329cabec
diff --git a/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/AlarmsManager.java b/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/AlarmsManager.java
index 1939231..f93dba6 100644
--- a/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/AlarmsManager.java
+++ b/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/AlarmsManager.java
@@ -15,13 +15,16 @@
*/
package org.onosproject.faultmanagement.impl;
-import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -47,8 +50,15 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.util.ItemNotFoundException;
+import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProvider;
import org.onosproject.incubator.net.faultmanagement.alarm.DefaultAlarm;
+import org.onosproject.net.device.DeviceService;
import org.osgi.service.component.ComponentContext;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.atomic.AtomicLong;
+import static org.onlab.util.Tools.groupedThreads;
+import org.onosproject.net.Device;
/**
* Implementation of the Alarm service.
@@ -57,89 +67,133 @@
@Service
public class AlarmsManager implements AlarmService {
+ // For subscribing to device-related events
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected AlarmProvider alarmProvider;
+
private final Logger log = getLogger(getClass());
private ApplicationId appId;
private IdGenerator idGenerator;
+ private ScheduledExecutorService alarmPollExecutor;
- @Property(name = "fmDevices", value = "127.0.0.1", label = "Instance-specific configurations")
- private String devConfigs;
-
- private final Map<AlarmId, Alarm> alarms = new ConcurrentHashMap<>();
-
-
+ // dummy data
private final AtomicLong alarmIdGenerator = new AtomicLong(0);
- @Override
- public Alarm update(Alarm replacement) {
+ private AlarmId generateAlarmId() {
+ return AlarmId.alarmId(alarmIdGenerator.incrementAndGet());
+ }
- final Alarm found = alarms.get(replacement.id());
+ private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 120;
+ @Property(name = "alarmPollFrequencySeconds", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
+ label = "Frequency (in seconds) for polling alarm from devices")
+ private int alarmPollFrequencySeconds = DEFAULT_POLL_FREQUENCY_SECONDS;
+
+ // TODO implement purging of old alarms.
+ private static final int DEFAULT_CLEAR_FREQUENCY_SECONDS = 500;
+ @Property(name = "clearedAlarmPurgeSeconds", intValue = DEFAULT_CLEAR_FREQUENCY_SECONDS,
+ label = "Frequency (in seconds) for deleting cleared alarms")
+ private int clearedAlarmPurgeFrequencySeconds = DEFAULT_CLEAR_FREQUENCY_SECONDS;
+
+ // TODO Later should must be persisted to disk or database
+ private final Map<AlarmId, Alarm> alarms = new ConcurrentHashMap<>();
+
+ @Override
+ public Alarm updateBookkeepingFields(AlarmId id, boolean isAcknowledged, String assignedUser) {
+
+ Alarm found = alarms.get(id);
if (found == null) {
- throw new ItemNotFoundException("Alarm with id " + replacement.id() + " found");
+ throw new ItemNotFoundException("Alarm with id " + id + " found");
}
- final Alarm updated = new DefaultAlarm.Builder(found).
- withAcknowledged(replacement.acknowledged()).
- withAssignedUser(replacement.assignedUser()).build();
- alarms.put(replacement.id(), updated);
+
+ Alarm updated = new DefaultAlarm.Builder(found).
+ withAcknowledged(isAcknowledged).
+ withAssignedUser(assignedUser).build();
+ alarms.put(id, updated);
+ return updated;
+ }
+
+ public Alarm clear(AlarmId id) {
+
+ Alarm found = alarms.get(id);
+ if (found == null) {
+ log.warn("id {} cant be cleared as it is already gone.", id);
+ return null;
+ }
+ Alarm updated = new DefaultAlarm.Builder(found).clear().build();
+ alarms.put(id, updated);
return updated;
}
@Override
- public int getActiveAlarmCount(DeviceId deviceId) {
- //TODO
- throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+ public Map<Alarm.SeverityLevel, Long> getAlarmCounts(DeviceId deviceId) {
+
+ return getAlarms(deviceId).stream().collect(
+ Collectors.groupingBy(Alarm::severity, Collectors.counting()));
+
}
+
+ @Override
+ public Map<Alarm.SeverityLevel, Long> getAlarmCounts() {
+
+ return getAlarms().stream().collect(
+ Collectors.groupingBy(Alarm::severity, Collectors.counting()));
+ }
+
+
private static final String NOT_SUPPORTED_YET = "Not supported yet.";
@Override
public Alarm getAlarm(AlarmId alarmId) {
return nullIsNotFound(
- alarms.get(alarmId),
+ alarms.get(
+ checkNotNull(alarmId, "Alarm Id cannot be null")),
"Alarm is not found");
}
@Override
public Set<Alarm> getAlarms() {
- //TODO
- throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+ return new HashSet<>(alarms.values());
}
@Override
public Set<Alarm> getActiveAlarms() {
- // Enpty set if no values
- return alarms.isEmpty() ? new HashSet<>() : new HashSet<>(alarms.values());
-
- }
-
- private static DefaultAlarm generateFake(DeviceId deviceId, AlarmId alarmId) {
-
- return new DefaultAlarm.Builder(
- alarmId, deviceId, "NE is not reachable", Alarm.SeverityLevel.MAJOR, System.currentTimeMillis()).
- withTimeUpdated(System.currentTimeMillis()).
- withServiceAffecting(true)
- .withAcknowledged(true).
- withManuallyClearable(true)
- .withAssignedUser("user1").build();
+ return alarms.values().stream().filter(
+ a -> !a.severity().equals(Alarm.SeverityLevel.CLEARED)).
+ collect(Collectors.toSet());
}
@Override
public Set<Alarm> getAlarms(Alarm.SeverityLevel severity) {
- //TODO
- throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+ return alarms.values().stream().filter(
+ a -> a.severity().equals(severity)).
+ collect(Collectors.toSet());
}
@Override
public Set<Alarm> getAlarms(DeviceId deviceId) {
- //TODO
- throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+ return alarms.values().stream().filter(
+ a -> deviceId.equals(a.deviceId())).
+ collect(Collectors.toSet());
+ }
+
+ private Set<Alarm> getActiveAlarms(DeviceId deviceId) {
+ return getActiveAlarms().stream().filter(
+ a -> deviceId.equals(a.deviceId())).
+ collect(Collectors.toSet());
}
@Override
public Set<Alarm> getAlarms(DeviceId deviceId, AlarmEntityId source) {
- //TODO
- throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+ return getAlarms(deviceId).stream().filter(
+ a -> source.equals(a.source())
+ ).collect(Collectors.toSet());
}
@Override
@@ -154,41 +208,92 @@
throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
}
- private void discoverAlarmsForDevice(DeviceId deviceId) {
- final AlarmId alarmId = new AlarmId(alarmIdGenerator.incrementAndGet());
-
- // TODO In a new thread invoke SNMP Provider with DeviceId and device type and when done update our of alarms
- //
- alarms.put(alarmId, generateFake(deviceId, alarmId));
-
- }
+ private final AlarmListener alarmListener = new InternalAlarmListener();
private class InternalAlarmListener implements AlarmListener {
@Override
public void event(AlarmEvent event) {
- // TODO
- throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+ log.debug("AlarmsManager. InternalAlarmListener received {}", event);
+ try {
+
+ switch (event.type()) {
+ case DEVICE_DISCOVERY:
+ DeviceId deviceId = checkNotNull(event.getDeviceRefreshed(), "Listener cannot be null");
+ log.info("New alarm set for {} received!", deviceId);
+ updateAlarms(event.subject(), deviceId);
+ break;
+
+ case NOTIFICATION:
+ throw new IllegalArgumentException(
+ "Alarm Notifications (Traps) not expected or implemented yet. Received =" + event);
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ log.warn("Failed to process {}", event, e);
+ }
}
}
@Activate
public void activate(ComponentContext context) {
- log.info("Activate ...");
- appId = coreService.registerApplication("org.onos.faultmanagement.alarms");
+ appId = coreService.registerApplication("org.onosproject.faultmanagement.alarms");
idGenerator = coreService.getIdGenerator("alarm-ids");
- log.info("Started with appId={} idGenerator={}", appId, idGenerator);
+ log.info("Started with appId={}", appId);
- final boolean result = modified(context);
+ alarmProvider.addAlarmListener(alarmListener);
+
+ probeActiveDevices();
+
+ boolean result = modified(context);
log.info("modified result = {}", result);
+ alarmPollExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/fm", "alarms-poll-%d"));
+ alarmPollExecutor.scheduleAtFixedRate(new PollAlarmsTask(),
+ alarmPollFrequencySeconds, alarmPollFrequencySeconds, SECONDS);
+
+ }
+
+ /**
+ * Auxiliary task to keep alarms up to date. IN future release alarm-notifications will be used as an optimization
+ * so we dont have to wait until polling to detect changes. Furthermore with simple polling flapping alarms may be
+ * missed.
+ */
+ private final class PollAlarmsTask implements Runnable {
+
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+ try {
+ probeActiveDevices();
+ } catch (RuntimeException e) {
+ log.error("Exception thrown during alarm synchronization process", e);
+ }
+ }
+ }
+
+ private void probeActiveDevices() {
+ Iterable<Device> devices = deviceService.getAvailableDevices();
+ log.info("Refresh alarms for all available devices={} ...", devices);
+ for (Device d : devices) {
+ log.info("Lets tell alarm provider to refresh alarms for {} ...", d.id());
+ alarmProvider.triggerProbe(d.id());
+ }
}
@Deactivate
public void deactivate(ComponentContext context) {
log.info("Deactivate ...");
- // cfgService.unregisterProperties(getClass(), false);
+ alarmProvider.removeAlarmListener(alarmListener);
+ if (alarmPollExecutor != null) {
+ alarmPollExecutor.shutdownNow();
+ }
+ alarms.clear();
log.info("Stopped");
}
@@ -199,24 +304,36 @@
log.info("No configuration file");
return false;
}
- final Dictionary<?, ?> properties = context.getProperties();
- final String ipaddresses = get(properties, "fmDevices");
- log.info("Settings: devConfigs={}", ipaddresses);
- if (!isNullOrEmpty(ipaddresses)) {
- discover(ipaddresses);
+ Dictionary<?, ?> properties = context.getProperties();
+ String clearedAlarmPurgeSeconds = get(properties, "clearedAlarmPurgeSeconds");
- }
+ log.info("Settings: clearedAlarmPurgeSeconds={}", clearedAlarmPurgeSeconds);
+
return true;
}
- private void discover(String ipaddresses) {
- for (String deviceEntry : ipaddresses.split(",")) {
- final DeviceId deviceId = DeviceId.deviceId(deviceEntry);
- if (deviceId != null) {
- log.info("Device {} needs to have its alarms refreshed!", deviceId);
- discoverAlarmsForDevice(deviceId);
- }
+ // Synchronised to prevent duplicate NE alarms being raised
+ synchronized void updateAlarms(Set<Alarm> discoveredSet, DeviceId deviceId) {
+ Set<Alarm> storedSet = getActiveAlarms(deviceId);
+ log.trace("currentNeAlarms={}. discoveredAlarms={}", storedSet, discoveredSet);
+
+ if (CollectionUtils.isEqualCollection(storedSet, discoveredSet)) {
+ log.debug("Alarm lists are equivalent so no update for {}.", deviceId);
+ return;
}
+
+ storedSet.stream().filter(
+ (stored) -> (!discoveredSet.contains(stored))).forEach((stored) -> {
+ log.info("Alarm will be cleared as it is not on the element. Cleared alarm: {}.", stored);
+ clear(stored.id());
+ });
+
+ discoveredSet.stream().filter(
+ (discovered) -> (!storedSet.contains(discovered))).forEach((discovered) -> {
+ log.info("Alarm will be raised as it is missing. New alarm: {}.", discovered);
+ AlarmId id = generateAlarmId();
+ alarms.put(id, new DefaultAlarm.Builder(discovered).withId(id).build());
+ });
}
}