ONOS-3600 - use separate dispatch queues for different classes of events

Change-Id: I139a3f4eb58db233ac009d03664281eefe357157
diff --git a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
index e44709c..8f0114d 100644
--- a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
@@ -15,6 +15,15 @@
  */
 package org.onosproject.event.impl;
 
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -25,16 +34,17 @@
 import org.onosproject.event.Event;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.event.EventSink;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.intent.IntentEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.topology.TopologyEvent;
 import org.slf4j.Logger;
 
 import com.google.common.base.Stopwatch;
-
-import java.util.TimerTask;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -53,44 +63,58 @@
 
     private final Logger log = getLogger(getClass());
 
-    private boolean executionTimeLimit = false;
+
+    private DispatchLoop topologyDispatcher = new DispatchLoop("topology");
+    private DispatchLoop programmingDispatcher = new DispatchLoop("programming");
+    private DispatchLoop defaultDispatcher = new DispatchLoop("default");
+
+    private Map<Class, DispatchLoop> dispatcherMap =
+            new ImmutableMap.Builder<Class, DispatchLoop>()
+                .put(TopologyEvent.class, topologyDispatcher)
+                .put(DeviceEvent.class, topologyDispatcher)
+                .put(LinkEvent.class, topologyDispatcher)
+                .put(HostEvent.class, topologyDispatcher)
+                .put(FlowRuleEvent.class, programmingDispatcher)
+                .put(IntentEvent.class, programmingDispatcher)
+                .build();
+
+    private Set<DispatchLoop> dispatchers =
+            new ImmutableSet.Builder<DispatchLoop>()
+                .addAll(dispatcherMap.values())
+                .add(defaultDispatcher)
+                .build();
 
     // Default number of millis a sink can take to process an event.
     private static final long DEFAULT_EXECUTE_MS = 5_000; // ms
     private static final long WATCHDOG_MS = 250; // ms
 
-    private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
-
-    private final ExecutorService executor =
-            newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d", log));
-
     @SuppressWarnings("unchecked")
     private static final Event KILL_PILL = new AbstractEvent(null, 0) {
     };
 
-    private volatile DispatchLoop dispatchLoop;
     private long maxProcessMillis = DEFAULT_EXECUTE_MS;
 
-    // Means to detect long-running sinks
-    private TimerTask watchdog;
-    private volatile EventSink lastSink;
-    private final Stopwatch stopwatch = Stopwatch.createUnstarted();
-    private volatile Future<?> dispatchFuture;
+    private DispatchLoop getDispatcher(Event event) {
+        DispatchLoop dispatcher = dispatcherMap.get(event.getClass());
+        if (dispatcher == null) {
+            dispatcher = defaultDispatcher;
+        }
+        return dispatcher;
+    }
 
     @Override
     public void post(Event event) {
-        if (!events.add(event)) {
+
+        if (!getDispatcher(event).add(event)) {
             log.error("Unable to post event {}", event);
         }
     }
 
     @Activate
     public void activate() {
-        dispatchLoop = new DispatchLoop();
-        dispatchFuture = executor.submit(dispatchLoop);
 
         if (maxProcessMillis != 0) {
-            startWatchdog();
+            dispatchers.forEach(DispatchLoop::startWatchdog);
         }
 
         log.info("Started");
@@ -98,25 +122,11 @@
 
     @Deactivate
     public void deactivate() {
-        dispatchLoop.stop();
-        stopWatchdog();
-        post(KILL_PILL);
+        dispatchers.forEach(DispatchLoop::stop);
+
         log.info("Stopped");
     }
 
-    private void startWatchdog() {
-        log.info("Starting watchdog task");
-        watchdog = new Watchdog();
-        SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
-    }
-
-    private void stopWatchdog() {
-        log.info("Stopping watchdog task");
-        if (watchdog != null) {
-            watchdog.cancel();
-        }
-    }
-
     @Override
     public void setDispatchTimeLimit(long millis) {
         checkPermission(EVENT_WRITE);
@@ -126,9 +136,9 @@
         maxProcessMillis = millis;
 
         if (millis == 0 && oldMillis != 0) {
-            stopWatchdog();
+            dispatchers.forEach(DispatchLoop::stopWatchdog);
         } else if (millis != 0 && oldMillis == 0) {
-            startWatchdog();
+            dispatchers.forEach(DispatchLoop::startWatchdog);
         }
     }
 
@@ -140,7 +150,28 @@
 
     // Auxiliary event dispatching loop that feeds off the events queue.
     private class DispatchLoop implements Runnable {
+        private final String name;
         private volatile boolean stopped;
+        private volatile EventSink lastSink;
+        // Means to detect long-running sinks
+        private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+        private TimerTask watchdog;
+        private volatile Future<?> dispatchFuture;
+        private final BlockingQueue<Event> eventsQueue;
+        private final ExecutorService executor;
+
+        DispatchLoop(String name) {
+            this.name = name;
+            executor = newSingleThreadExecutor(
+                    groupedThreads("onos/event",
+                    "dispatch-" + name + "%d", log));
+            eventsQueue = new LinkedBlockingQueue<>();
+            dispatchFuture = executor.submit(this);
+        }
+
+        public boolean add(Event event) {
+            return eventsQueue.add(event);
+        }
 
         @Override
         public void run() {
@@ -149,7 +180,7 @@
             while (!stopped) {
                 try {
                     // Fetch the next event and if it is the kill-pill, bail
-                    Event event = events.take();
+                    Event event = eventsQueue.take();
                     if (event == KILL_PILL) {
                         break;
                     }
@@ -180,28 +211,46 @@
 
         void stop() {
             stopped = true;
+            stopWatchdog();
+            add(KILL_PILL);
         }
-    }
 
-    // Monitors event sinks to make sure none take too long to execute.
-    private class Watchdog extends TimerTask {
-        @Override
-        public void run() {
-            long elapsedTimeMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-            if (elapsedTimeMillis > maxProcessMillis) {
-                stopwatch.reset();
-                log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop",
-                          lastSink.getClass().getName(), elapsedTimeMillis);
+        // Monitors event sinks to make sure none take too long to execute.
+        private class Watchdog extends TimerTask {
+            @Override
+            public void run() {
+                long elapsedTimeMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+                if (elapsedTimeMillis > maxProcessMillis) {
+                    stopwatch.reset();
+                    log.warn("Event sink {} exceeded execution time limit: {} ms; " +
+                             "spawning new dispatch loop",
+                             lastSink.getClass().getName(), elapsedTimeMillis);
 
-                // Notify the sink that it has exceeded its time limit.
-                lastSink.onProcessLimit();
+                    // Notify the sink that it has exceeded its time limit.
+                    lastSink.onProcessLimit();
 
-                // Cancel the old dispatch loop and submit a new one.
-                dispatchLoop.stop();
-                dispatchLoop = new DispatchLoop();
+                    // Cancel the old dispatch loop and submit a new one.
+
+                stop();
                 dispatchFuture.cancel(true);
-                dispatchFuture = executor.submit(dispatchLoop);
+                dispatchFuture = executor.submit(this);
+                }
+            }
+        }
+
+        private void startWatchdog() {
+            log.info("Starting watchdog task for dispatcher {}", name);
+            watchdog = new Watchdog();
+            SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
+        }
+
+        private void stopWatchdog() {
+            log.info("Stopping watchdog task for dispatcher {}", name);
+            if (watchdog != null) {
+                watchdog.cancel();
             }
         }
     }
+
+
 }
diff --git a/core/net/src/test/java/org/onosproject/event/impl/CoreEventDispatcherTest.java b/core/net/src/test/java/org/onosproject/event/impl/CoreEventDispatcherTest.java
index 77b2d68..1538e16 100644
--- a/core/net/src/test/java/org/onosproject/event/impl/CoreEventDispatcherTest.java
+++ b/core/net/src/test/java/org/onosproject/event/impl/CoreEventDispatcherTest.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test of the event dispatcher mechanism.
@@ -76,6 +77,17 @@
         validate(prickleSink);
     }
 
+    @Test
+    public void postEventSinkTakesTooLong() throws Exception {
+        SinkProcessTakesTooLong takesTooLong = new SinkProcessTakesTooLong();
+        dispatcher.setDispatchTimeLimit(250);
+        dispatcher.addSink(TooLongEvent.class, takesTooLong);
+        takesTooLong.latch = new CountDownLatch(1);
+        dispatcher.post(new TooLongEvent("XYZZY"));
+        takesTooLong.latch.await(1000, TimeUnit.MILLISECONDS);
+        assertTrue(takesTooLong.interrupted);
+    }
+
     private void validate(Sink sink, String... strings) {
         int i = 0;
         assertEquals("incorrect event count", strings.length, sink.subjects.size());
@@ -129,4 +141,25 @@
         }
     }
 
+    private static class TooLongEvent extends AbstractEvent<Type, String> {
+        protected TooLongEvent(String subject) {
+            super(Type.FOO, subject);
+        }
+    }
+
+    private static class SinkProcessTakesTooLong
+                         implements EventSink<TooLongEvent> {
+        boolean interrupted = false;
+        CountDownLatch latch;
+
+        public void process(TooLongEvent event) {
+            try {
+                Thread.sleep(5 * 1000);
+            } catch (InterruptedException ie) {
+                interrupted = true;
+            }
+            latch.countDown();
+        }
+    }
+
 }