Added a detection mechanism for long-running or dead-locked event sinks and listeners.

Change-Id: I21308b058902a94c31c34c2ec2878cd13213874e
diff --git a/core/api/src/main/java/org/onosproject/event/DefaultEventSinkRegistry.java b/core/api/src/main/java/org/onosproject/event/DefaultEventSinkRegistry.java
index 734d9b5..be6ddb6 100644
--- a/core/api/src/main/java/org/onosproject/event/DefaultEventSinkRegistry.java
+++ b/core/api/src/main/java/org/onosproject/event/DefaultEventSinkRegistry.java
@@ -51,6 +51,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public <E extends Event> EventSink<E> getSink(Class<E> eventClass) {
+        checkNotNull(eventClass, "Event class cannot be null");
         return (EventSink<E>) sinks.get(eventClass);
     }
 
diff --git a/core/api/src/main/java/org/onosproject/event/EventSink.java b/core/api/src/main/java/org/onosproject/event/EventSink.java
index c6e5ac9..221b322 100644
--- a/core/api/src/main/java/org/onosproject/event/EventSink.java
+++ b/core/api/src/main/java/org/onosproject/event/EventSink.java
@@ -27,4 +27,10 @@
      */
     void process(E event);
 
+    /**
+     * Handles notification that event processing time limit has been exceeded.
+     */
+    default void onProcessLimit() {
+    }
+
 }
diff --git a/core/api/src/main/java/org/onosproject/event/ListenerRegistry.java b/core/api/src/main/java/org/onosproject/event/ListenerRegistry.java
index 4164441..c2e1aed 100644
--- a/core/api/src/main/java/org/onosproject/event/ListenerRegistry.java
+++ b/core/api/src/main/java/org/onosproject/event/ListenerRegistry.java
@@ -35,6 +35,9 @@
 
     private volatile boolean shutdown = false;
 
+    private long lastStart;
+    private L lastListener;
+
     /**
      * Set of listeners that have registered.
      */
@@ -67,13 +70,26 @@
     public void process(E event) {
         for (L listener : listeners) {
             try {
+                lastListener = listener;
+                lastStart = System.currentTimeMillis();
                 listener.event(event);
+                lastStart = 0;
             } catch (Exception error) {
                 reportProblem(event, error);
             }
         }
     }
 
+    @Override
+    public void onProcessLimit() {
+        if (lastStart > 0) {
+            log.error("Listener {} exceeded execution time limit: {} ms; ejected",
+                      lastListener.getClass().getName(),
+                      System.currentTimeMillis() - lastStart);
+            removeListener(lastListener);
+        }
+    }
+
     /**
      * Predicate indicating whether we should throw an exception if the
      * argument to {@link #removeListener} is not in the current set of
diff --git a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
index 8850104..382846c 100644
--- a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
@@ -19,6 +19,7 @@
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.SharedExecutors;
 import org.onosproject.event.AbstractEvent;
 import org.onosproject.event.DefaultEventSinkRegistry;
 import org.onosproject.event.Event;
@@ -26,8 +27,10 @@
 import org.onosproject.event.EventSink;
 import org.slf4j.Logger;
 
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -42,8 +45,14 @@
 public class CoreEventDispatcher extends DefaultEventSinkRegistry
         implements EventDeliveryService {
 
+    // Maximum number of millis a sink can take to process an event.
+    private static final long MAX_EXECUTE_MS = 1_000;
+    private static final long WATCHDOG_MS = MAX_EXECUTE_MS / 4;
+
     private final Logger log = getLogger(getClass());
 
+    private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
+
     private final ExecutorService executor =
             newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d"));
 
@@ -51,34 +60,45 @@
     private static final Event KILL_PILL = new AbstractEvent(null, 0) {
     };
 
-    private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
+    private DispatchLoop dispatchLoop;
 
-    private volatile boolean stopped = false;
+    // Means to detect long-running sinks
+    private TimerTask watchdog;
+    private EventSink lastSink;
+    private long lastStart = 0;
+    private Future<?> dispatchFuture;
 
     @Override
     public void post(Event event) {
-        events.add(event);
+        if (!events.add(event)) {
+            log.error("Unable to post event {}", event);
+        }
     }
 
     @Activate
     public void activate() {
-        stopped = false;
-        executor.execute(new DispatchLoop());
+        dispatchLoop = new DispatchLoop();
+        dispatchFuture = executor.submit(dispatchLoop);
+        watchdog = new Watchdog();
+        SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        stopped = true;
+        dispatchLoop.stop();
+        watchdog.cancel();
         post(KILL_PILL);
         log.info("Stopped");
     }
 
     // Auxiliary event dispatching loop that feeds off the events queue.
     private class DispatchLoop implements Runnable {
+        private volatile boolean stopped;
+
         @Override
-        @SuppressWarnings("unchecked")
         public void run() {
+            stopped = false;
             log.info("Dispatch loop initiated");
             while (!stopped) {
                 try {
@@ -87,22 +107,52 @@
                     if (event == KILL_PILL) {
                         break;
                     }
-
-                    // Locate the sink for the event class and use it to
-                    // process the event
-                    EventSink sink = getSink(event.getClass());
-                    if (sink != null) {
-                        sink.process(event);
-                    } else {
-                        log.warn("No sink registered for event class {}",
-                                 event.getClass());
-                    }
+                    process(event);
                 } catch (Exception e) {
                     log.warn("Error encountered while dispatching event:", e);
                 }
             }
             log.info("Dispatch loop terminated");
         }
+
+        // Locate the sink for the event class and use it to process the event
+        @SuppressWarnings("unchecked")
+        private void process(Event event) {
+            EventSink sink = getSink(event.getClass());
+            if (sink != null) {
+                lastSink = sink;
+                lastStart = System.currentTimeMillis();
+                sink.process(event);
+                lastStart = 0;
+            } else {
+                log.warn("No sink registered for event class {}",
+                         event.getClass());
+            }
+        }
+
+        void stop() {
+            stopped = true;
+        }
     }
 
+    // Monitors event sinks to make sure none take too long to execute.
+    private class Watchdog extends TimerTask {
+        @Override
+        public void run() {
+            long delta = System.currentTimeMillis() - lastStart;
+            if (lastStart > 0 && delta > MAX_EXECUTE_MS) {
+                log.error("Event sink {} exceeded execution time limit: {} ms",
+                          lastSink.getClass().getName(), delta);
+
+                // Notify the sink that it has exceeded its time limit.
+                lastSink.onProcessLimit();
+
+                // Cancel the old dispatch loop and submit a new one.
+                dispatchLoop.stop();
+                dispatchLoop = new DispatchLoop();
+                dispatchFuture.cancel(true);
+                dispatchFuture = executor.submit(dispatchLoop);
+            }
+        }
+    }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index d12664b..9a2b9e2 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -58,6 +58,7 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -423,8 +424,8 @@
         }
 
         @Override
-        public void receivedRoleReply(
-                DeviceId deviceId, MastershipRole requested, MastershipRole response) {
+        public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
+                                      MastershipRole response) {
             // Several things can happen here:
             // 1. request and response match
             // 2. request and response don't match
@@ -436,7 +437,7 @@
             // FIXME: implement response to this notification
 
             log.info("got reply to a role request for {}: asked for {}, and got {}",
-                    deviceId, requested, response);
+                     deviceId, requested, response);
 
             if (requested == null && response == null) {
                 // something was off with DeviceProvider, maybe check channel too?
@@ -445,9 +446,8 @@
                 return;
             }
 
-            if (requested.equals(response)) {
-                if (requested.equals(mastershipService.getLocalRole(deviceId))) {
-
+            if (Objects.equals(requested, response)) {
+                if (Objects.equals(requested, mastershipService.getLocalRole(deviceId))) {
                     return;
                 } else {
                     return;
@@ -464,19 +464,16 @@
                     //post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device));
                 }
             }
-
         }
 
         @Override
         public void updatePortStatistics(DeviceId deviceId, Collection<PortStatistics> portStatistics) {
             checkNotNull(deviceId, DEVICE_ID_NULL);
-            checkNotNull(portStatistics,
-                    "Port statistics list cannot be null");
+            checkNotNull(portStatistics, "Port statistics list cannot be null");
             checkValidity();
 
             DeviceEvent event = store.updatePortStatistics(this.provider().id(),
-                    deviceId, portStatistics);
-
+                                                           deviceId, portStatistics);
             post(event);
         }
     }
@@ -634,8 +631,7 @@
     }
 
     // Store delegate to re-post events emitted from the store.
-    private class InternalStoreDelegate
-            implements DeviceStoreDelegate {
+    private class InternalStoreDelegate implements DeviceStoreDelegate {
         @Override
         public void notify(DeviceEvent event) {
             post(event);