implementing flowremoved handling
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/of/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onlab/onos/of/controller/impl/OpenFlowControllerImpl.java
index a8b0673..a119be3 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/of/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/of/controller/impl/OpenFlowControllerImpl.java
@@ -1,8 +1,14 @@
 package org.onlab.onos.of.controller.impl;
 
+import static org.onlab.util.Tools.namedThreads;
+
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -13,6 +19,7 @@
 import org.onlab.onos.of.controller.DefaultOpenFlowPacketContext;
 import org.onlab.onos.of.controller.Dpid;
 import org.onlab.onos.of.controller.OpenFlowController;
+import org.onlab.onos.of.controller.OpenFlowEventListener;
 import org.onlab.onos.of.controller.OpenFlowPacketContext;
 import org.onlab.onos.of.controller.OpenFlowSwitch;
 import org.onlab.onos.of.controller.OpenFlowSwitchListener;
@@ -22,10 +29,12 @@
 import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFPacketIn;
 import org.projectfloodlight.openflow.protocol.OFPortStatus;
+import org.projectfloodlight.openflow.protocol.OFType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 
 @Component(immediate = true)
@@ -35,6 +44,10 @@
     private static final Logger log =
             LoggerFactory.getLogger(OpenFlowControllerImpl.class);
 
+    private final ExecutorService executor = Executors.newFixedThreadPool(16,
+            namedThreads("of-event-dispatch-%d"));
+
+
     protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
             new ConcurrentHashMap<Dpid, OpenFlowSwitch>();
     protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
@@ -43,11 +56,12 @@
             new ConcurrentHashMap<Dpid, OpenFlowSwitch>();
 
     protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
-    protected Set<OpenFlowSwitchListener> ofEventListener = new HashSet<>();
+    protected Set<OpenFlowSwitchListener> ofSwitchListener = new HashSet<>();
 
     protected Multimap<Integer, PacketListener> ofPacketListener =
             ArrayListMultimap.create();
 
+    protected Map<OFType, List<OpenFlowEventListener>> ofEventListener = Maps.newHashMap();
 
     private final Controller ctrl = new Controller();
 
@@ -93,14 +107,14 @@
 
     @Override
     public void addListener(OpenFlowSwitchListener listener) {
-        if (!ofEventListener.contains(listener)) {
-            this.ofEventListener.add(listener);
+        if (!ofSwitchListener.contains(listener)) {
+            this.ofSwitchListener.add(listener);
         }
     }
 
     @Override
     public void removeListener(OpenFlowSwitchListener listener) {
-        this.ofEventListener.remove(listener);
+        this.ofSwitchListener.remove(listener);
     }
 
     @Override
@@ -122,7 +136,7 @@
     public void processPacket(Dpid dpid, OFMessage msg) {
         switch (msg.getType()) {
         case PORT_STATUS:
-            for (OpenFlowSwitchListener l : ofEventListener) {
+            for (OpenFlowSwitchListener l : ofSwitchListener) {
                 l.portChanged(dpid, (OFPortStatus) msg);
             }
             break;
@@ -134,6 +148,12 @@
                 p.handlePacket(pktCtx);
             }
             break;
+        case FLOW_REMOVED:
+        case ERROR:
+        case STATS_REPLY:
+        case BARRIER_REPLY:
+            executor.submit(new OFMessageHandler(dpid, msg));
+            break;
         default:
             log.warn("Handling message type {} not yet implemented {}",
                     msg.getType(), msg);
@@ -164,7 +184,7 @@
             } else {
                 log.error("Added switch {}", dpid);
                 connectedSwitches.put(dpid, sw);
-                for (OpenFlowSwitchListener l : ofEventListener) {
+                for (OpenFlowSwitchListener l : ofSwitchListener) {
                     l.switchAdded(dpid);
                 }
                 return true;
@@ -277,7 +297,7 @@
             if (sw == null) {
                 sw = activeEqualSwitches.remove(dpid);
             }
-            for (OpenFlowSwitchListener l : ofEventListener) {
+            for (OpenFlowSwitchListener l : ofSwitchListener) {
                 l.switchRemoved(dpid);
             }
         }
@@ -288,5 +308,26 @@
         }
     }
 
+    private final class OFMessageHandler implements Runnable {
+
+        private final OFMessage msg;
+        private final Dpid dpid;
+
+        public OFMessageHandler(Dpid dpid, OFMessage msg) {
+            this.msg = msg;
+            this.dpid = dpid;
+        }
+
+        @Override
+        public void run() {
+            List<OpenFlowEventListener> listeners =
+                    ofEventListener.get(OFType.FLOW_REMOVED);
+            for (OpenFlowEventListener listener : listeners) {
+                listener.handleMessage(dpid, msg);
+            }
+        }
+
+    }
+
 
 }