Moving the reactive forwarding task of black-hole fixing off the event thread.

Change-Id: I3b17dc9a50906a0e3698f20f88e77e01d1037737
diff --git a/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwarding.java b/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwarding.java
index 8ed55c6..09b8533 100644
--- a/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwarding.java
+++ b/apps/fwd/src/main/java/org/onosproject/fwd/ReactiveForwarding.java
@@ -22,8 +22,8 @@
 import org.apache.felix.scr.annotations.Modified;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.Service;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.ICMP;
 import org.onlab.packet.ICMP6;
@@ -73,20 +73,24 @@
 import org.onosproject.net.topology.TopologyEvent;
 import org.onosproject.net.topology.TopologyListener;
 import org.onosproject.net.topology.TopologyService;
-import org.onosproject.store.service.StorageService;
-import org.osgi.service.component.ComponentContext;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.WallClockTimestamp;
 import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
+
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -198,6 +202,8 @@
 
     private final TopologyListener topologyListener = new InternalTopologyListener();
 
+    private ExecutorService blackHoleExecutor;
+
 
     @Activate
     public void activate(ComponentContext context) {
@@ -212,6 +218,10 @@
                         MultiValuedTimestamp<>(new WallClockTimestamp(), System.nanoTime()))
                 .build();
 
+        blackHoleExecutor = newSingleThreadExecutor(groupedThreads("onos/app/fwd",
+                                                                   "black-hole-fixer",
+                                                                   log));
+
         cfgService.registerProperties(getClass());
         appId = coreService.registerApplication("org.onosproject.fwd");
 
@@ -230,6 +240,8 @@
         flowRuleService.removeFlowRulesById(appId);
         packetService.removeProcessor(processor);
         topologyService.removeListener(topologyListener);
+        blackHoleExecutor.shutdown();
+        blackHoleExecutor = null;
         processor = null;
         log.info("Stopped");
     }
@@ -719,8 +731,8 @@
                 reasons.forEach(re -> {
                     if (re instanceof LinkEvent) {
                         LinkEvent le = (LinkEvent) re;
-                        if (le.type() == LinkEvent.Type.LINK_REMOVED) {
-                            fixBlackhole(le.subject().src());
+                        if (le.type() == LinkEvent.Type.LINK_REMOVED && blackHoleExecutor != null) {
+                            blackHoleExecutor.submit(() -> fixBlackhole(le.subject().src()));
                         }
                     }
                 });