[ONOS-3203] End-to-end demo of Fault Management via SNMP.

This adds SNMP device-discovery, and a Fault Management app which makes alarms available to users via REST/GUI/CLI interfaces.
There is still code cleanup that could be done, but aim of this commit is an end-to-end proof of concept.

To demonstrate :

1) /opt/onos/bin/onos-service
onos> app activate org.onosproject.snmp
onos> app activate org.onosproject.faultmanagement

2) SNMP devices are seeded via config file. The default seed file contains connection details for devices (SNMP agents) available via internet  e.g. demo.snmplabs.com
cp /opt/onos/apache-karaf-3.0.3/etc/samples/org.onosproject.provider.snmp.device.impl.SnmpDeviceProvider.cfg   /opt/onos/apache-karaf-3.0.3/etc/

3) ONOS will poll these SNMP devices and store their alarms.

4) You can now manipulate the alarms via REST  e.g. http://<onos>:8181/onos/v1/fm/alarms , via CLI  via various "alarm-*” commands or in UI with an Alarms Overlay.

More info at https://wiki.onosproject.org/display/ONOS/Fault+Management

15/Dec/15: Updated regarding review comments from Thomas Vachuska.
17/Dec/15: Updated coreService.registerApplication(name) as per https://gerrit.onosproject.org/#/c/6878/

Change-Id: I886f8511f178dc4600ab96e5ff10cc90329cabec
diff --git a/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/AlarmsManager.java b/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/AlarmsManager.java
index 1939231..f93dba6 100644
--- a/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/AlarmsManager.java
+++ b/apps/faultmanagement/fmmgr/src/main/java/org/onosproject/faultmanagement/impl/AlarmsManager.java
@@ -15,13 +15,16 @@
  */
 package org.onosproject.faultmanagement.impl;
 
-import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Preconditions.checkNotNull;
 import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -47,8 +50,15 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.util.ItemNotFoundException;
+import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProvider;
 import org.onosproject.incubator.net.faultmanagement.alarm.DefaultAlarm;
+import org.onosproject.net.device.DeviceService;
 import org.osgi.service.component.ComponentContext;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.concurrent.atomic.AtomicLong;
+import static org.onlab.util.Tools.groupedThreads;
+import org.onosproject.net.Device;
 
 /**
  * Implementation of the Alarm service.
@@ -57,89 +67,133 @@
 @Service
 public class AlarmsManager implements AlarmService {
 
+    // For subscribing to device-related events
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected AlarmProvider alarmProvider;
+
     private final Logger log = getLogger(getClass());
     private ApplicationId appId;
     private IdGenerator idGenerator;
 
+    private ScheduledExecutorService alarmPollExecutor;
 
-    @Property(name = "fmDevices", value = "127.0.0.1", label = "Instance-specific configurations")
-    private String devConfigs;
-
-    private final Map<AlarmId, Alarm> alarms = new ConcurrentHashMap<>();
-
-
+    // dummy data
     private final AtomicLong alarmIdGenerator = new AtomicLong(0);
 
-    @Override
-    public Alarm update(Alarm replacement) {
+    private AlarmId generateAlarmId() {
+        return AlarmId.alarmId(alarmIdGenerator.incrementAndGet());
+    }
 
-        final Alarm found = alarms.get(replacement.id());
+    private static final int DEFAULT_POLL_FREQUENCY_SECONDS = 120;
+    @Property(name = "alarmPollFrequencySeconds", intValue = DEFAULT_POLL_FREQUENCY_SECONDS,
+            label = "Frequency (in seconds) for polling alarm from devices")
+    private int alarmPollFrequencySeconds = DEFAULT_POLL_FREQUENCY_SECONDS;
+
+    // TODO implement purging of old alarms.
+    private static final int DEFAULT_CLEAR_FREQUENCY_SECONDS = 500;
+    @Property(name = "clearedAlarmPurgeSeconds", intValue = DEFAULT_CLEAR_FREQUENCY_SECONDS,
+            label = "Frequency (in seconds) for deleting cleared alarms")
+    private int clearedAlarmPurgeFrequencySeconds = DEFAULT_CLEAR_FREQUENCY_SECONDS;
+
+    // TODO Later should must be persisted to disk or database
+    private final Map<AlarmId, Alarm> alarms = new ConcurrentHashMap<>();
+
+    @Override
+    public Alarm updateBookkeepingFields(AlarmId id, boolean isAcknowledged, String assignedUser) {
+
+        Alarm found = alarms.get(id);
         if (found == null) {
-            throw new ItemNotFoundException("Alarm with id " + replacement.id() + " found");
+            throw new ItemNotFoundException("Alarm with id " + id + " found");
         }
-        final Alarm updated = new DefaultAlarm.Builder(found).
-                withAcknowledged(replacement.acknowledged()).
-                withAssignedUser(replacement.assignedUser()).build();
-        alarms.put(replacement.id(), updated);
+
+        Alarm updated = new DefaultAlarm.Builder(found).
+                withAcknowledged(isAcknowledged).
+                withAssignedUser(assignedUser).build();
+        alarms.put(id, updated);
+        return updated;
+    }
+
+    public Alarm clear(AlarmId id) {
+
+        Alarm found = alarms.get(id);
+        if (found == null) {
+            log.warn("id {} cant be cleared as it is already gone.", id);
+            return null;
+        }
+        Alarm updated = new DefaultAlarm.Builder(found).clear().build();
+        alarms.put(id, updated);
         return updated;
     }
 
     @Override
-    public int getActiveAlarmCount(DeviceId deviceId) {
-        //TODO
-        throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+    public Map<Alarm.SeverityLevel, Long> getAlarmCounts(DeviceId deviceId) {
+
+        return getAlarms(deviceId).stream().collect(
+                Collectors.groupingBy(Alarm::severity, Collectors.counting()));
+
     }
+
+    @Override
+    public Map<Alarm.SeverityLevel, Long> getAlarmCounts() {
+
+        return getAlarms().stream().collect(
+                Collectors.groupingBy(Alarm::severity, Collectors.counting()));
+    }
+
+
     private static final String NOT_SUPPORTED_YET = "Not supported yet.";
 
     @Override
     public Alarm getAlarm(AlarmId alarmId) {
         return nullIsNotFound(
-                alarms.get(alarmId),
+                alarms.get(
+                        checkNotNull(alarmId, "Alarm Id cannot be null")),
                 "Alarm is not found");
     }
 
     @Override
     public Set<Alarm> getAlarms() {
-        //TODO
-        throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+        return new HashSet<>(alarms.values());
     }
 
     @Override
     public Set<Alarm> getActiveAlarms() {
-        // Enpty set if no values
-        return alarms.isEmpty() ? new HashSet<>() : new HashSet<>(alarms.values());
-
-    }
-
-    private static DefaultAlarm generateFake(DeviceId deviceId, AlarmId alarmId) {
-
-        return new DefaultAlarm.Builder(
-                alarmId, deviceId, "NE is not reachable", Alarm.SeverityLevel.MAJOR, System.currentTimeMillis()).
-                withTimeUpdated(System.currentTimeMillis()).
-                withServiceAffecting(true)
-                .withAcknowledged(true).
-                withManuallyClearable(true)
-                .withAssignedUser("user1").build();
+        return alarms.values().stream().filter(
+                a -> !a.severity().equals(Alarm.SeverityLevel.CLEARED)).
+                collect(Collectors.toSet());
     }
 
     @Override
     public Set<Alarm> getAlarms(Alarm.SeverityLevel severity) {
-        //TODO
-        throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+        return alarms.values().stream().filter(
+                a -> a.severity().equals(severity)).
+                collect(Collectors.toSet());
     }
 
     @Override
     public Set<Alarm> getAlarms(DeviceId deviceId) {
-        //TODO
-        throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+        return alarms.values().stream().filter(
+                a -> deviceId.equals(a.deviceId())).
+                collect(Collectors.toSet());
+    }
+
+    private Set<Alarm> getActiveAlarms(DeviceId deviceId) {
+        return getActiveAlarms().stream().filter(
+                a -> deviceId.equals(a.deviceId())).
+                collect(Collectors.toSet());
     }
 
     @Override
     public Set<Alarm> getAlarms(DeviceId deviceId, AlarmEntityId source) {
-        //TODO
-        throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+        return getAlarms(deviceId).stream().filter(
+                a -> source.equals(a.source())
+        ).collect(Collectors.toSet());
     }
 
     @Override
@@ -154,41 +208,92 @@
         throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
     }
 
-    private void discoverAlarmsForDevice(DeviceId deviceId) {
-        final AlarmId alarmId = new AlarmId(alarmIdGenerator.incrementAndGet());
-
-        // TODO In a new thread invoke SNMP Provider with DeviceId and device type and when done update our of alarms
-        //
-        alarms.put(alarmId, generateFake(deviceId, alarmId));
-
-    }
+    private final AlarmListener alarmListener = new InternalAlarmListener();
 
     private class InternalAlarmListener implements AlarmListener {
 
         @Override
         public void event(AlarmEvent event) {
-            // TODO
-            throw new UnsupportedOperationException(NOT_SUPPORTED_YET);
+            log.debug("AlarmsManager. InternalAlarmListener received {}", event);
+            try {
+
+                switch (event.type()) {
+                    case DEVICE_DISCOVERY:
+                        DeviceId deviceId = checkNotNull(event.getDeviceRefreshed(), "Listener cannot be null");
+                        log.info("New alarm set for {} received!", deviceId);
+                        updateAlarms(event.subject(), deviceId);
+                        break;
+
+                    case NOTIFICATION:
+                        throw new IllegalArgumentException(
+                                "Alarm Notifications (Traps) not expected or implemented yet. Received =" + event);
+                    default:
+                        break;
+                }
+            } catch (Exception e) {
+                log.warn("Failed to process {}", event, e);
+            }
         }
     }
 
     @Activate
     public void activate(ComponentContext context) {
-        log.info("Activate ...");
-        appId = coreService.registerApplication("org.onos.faultmanagement.alarms");
+        appId = coreService.registerApplication("org.onosproject.faultmanagement.alarms");
         idGenerator = coreService.getIdGenerator("alarm-ids");
-        log.info("Started with appId={} idGenerator={}", appId, idGenerator);
+        log.info("Started with appId={}", appId);
 
-        final boolean result = modified(context);
+        alarmProvider.addAlarmListener(alarmListener);
+
+        probeActiveDevices();
+
+        boolean result = modified(context);
         log.info("modified result = {}", result);
 
+        alarmPollExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/fm", "alarms-poll-%d"));
+        alarmPollExecutor.scheduleAtFixedRate(new PollAlarmsTask(),
+                alarmPollFrequencySeconds, alarmPollFrequencySeconds, SECONDS);
+
+    }
+
+    /**
+     * Auxiliary task to keep alarms up to date. IN future release alarm-notifications will be used as an optimization
+     * so we dont have to wait until polling to detect changes. Furthermore with simple polling flapping alarms may be
+     * missed.
+     */
+    private final class PollAlarmsTask implements Runnable {
+
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                log.info("Interrupted, quitting");
+                return;
+            }
+            try {
+                probeActiveDevices();
+            } catch (RuntimeException e) {
+                log.error("Exception thrown during alarm synchronization process", e);
+            }
+        }
+    }
+
+    private void probeActiveDevices() {
+        Iterable<Device> devices = deviceService.getAvailableDevices();
+        log.info("Refresh alarms for all available devices={} ...", devices);
+        for (Device d : devices) {
+            log.info("Lets tell alarm provider to refresh alarms for {} ...", d.id());
+            alarmProvider.triggerProbe(d.id());
+        }
     }
 
     @Deactivate
     public void deactivate(ComponentContext context) {
         log.info("Deactivate ...");
-        //     cfgService.unregisterProperties(getClass(), false);
+        alarmProvider.removeAlarmListener(alarmListener);
 
+        if (alarmPollExecutor != null) {
+            alarmPollExecutor.shutdownNow();
+        }
+        alarms.clear();
         log.info("Stopped");
     }
 
@@ -199,24 +304,36 @@
             log.info("No configuration file");
             return false;
         }
-        final Dictionary<?, ?> properties = context.getProperties();
-        final String ipaddresses = get(properties, "fmDevices");
-        log.info("Settings: devConfigs={}", ipaddresses);
-        if (!isNullOrEmpty(ipaddresses)) {
-            discover(ipaddresses);
+        Dictionary<?, ?> properties = context.getProperties();
+        String clearedAlarmPurgeSeconds = get(properties, "clearedAlarmPurgeSeconds");
 
-        }
+        log.info("Settings: clearedAlarmPurgeSeconds={}", clearedAlarmPurgeSeconds);
+
         return true;
     }
 
-    private void discover(String ipaddresses) {
-        for (String deviceEntry : ipaddresses.split(",")) {
-            final DeviceId deviceId = DeviceId.deviceId(deviceEntry);
-            if (deviceId != null) {
-                log.info("Device {} needs to have its alarms refreshed!", deviceId);
-                discoverAlarmsForDevice(deviceId);
-            }
+    // Synchronised to prevent duplicate NE alarms being raised
+    synchronized void updateAlarms(Set<Alarm> discoveredSet, DeviceId deviceId) {
+        Set<Alarm> storedSet = getActiveAlarms(deviceId);
+        log.trace("currentNeAlarms={}. discoveredAlarms={}", storedSet, discoveredSet);
+
+        if (CollectionUtils.isEqualCollection(storedSet, discoveredSet)) {
+            log.debug("Alarm lists are equivalent so no update for {}.", deviceId);
+            return;
         }
+
+        storedSet.stream().filter(
+                (stored) -> (!discoveredSet.contains(stored))).forEach((stored) -> {
+                    log.info("Alarm will be cleared as it is not on the element. Cleared alarm: {}.", stored);
+                    clear(stored.id());
+                });
+
+        discoveredSet.stream().filter(
+                (discovered) -> (!storedSet.contains(discovered))).forEach((discovered) -> {
+            log.info("Alarm will be raised as it is missing. New alarm: {}.", discovered);
+            AlarmId id = generateAlarmId();
+            alarms.put(id, new DefaultAlarm.Builder(discovered).withId(id).build());
+        });
     }
 
 }