implementing flowremoved handling
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/of/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onlab/onos/of/controller/impl/OpenFlowControllerImpl.java
index a8b0673..a119be3 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/of/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/of/controller/impl/OpenFlowControllerImpl.java
@@ -1,8 +1,14 @@
package org.onlab.onos.of.controller.impl;
+import static org.onlab.util.Tools.namedThreads;
+
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -13,6 +19,7 @@
import org.onlab.onos.of.controller.DefaultOpenFlowPacketContext;
import org.onlab.onos.of.controller.Dpid;
import org.onlab.onos.of.controller.OpenFlowController;
+import org.onlab.onos.of.controller.OpenFlowEventListener;
import org.onlab.onos.of.controller.OpenFlowPacketContext;
import org.onlab.onos.of.controller.OpenFlowSwitch;
import org.onlab.onos.of.controller.OpenFlowSwitchListener;
@@ -22,10 +29,12 @@
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFPacketIn;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
+import org.projectfloodlight.openflow.protocol.OFType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@Component(immediate = true)
@@ -35,6 +44,10 @@
private static final Logger log =
LoggerFactory.getLogger(OpenFlowControllerImpl.class);
+ private final ExecutorService executor = Executors.newFixedThreadPool(16,
+ namedThreads("of-event-dispatch-%d"));
+
+
protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
new ConcurrentHashMap<Dpid, OpenFlowSwitch>();
protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
@@ -43,11 +56,12 @@
new ConcurrentHashMap<Dpid, OpenFlowSwitch>();
protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
- protected Set<OpenFlowSwitchListener> ofEventListener = new HashSet<>();
+ protected Set<OpenFlowSwitchListener> ofSwitchListener = new HashSet<>();
protected Multimap<Integer, PacketListener> ofPacketListener =
ArrayListMultimap.create();
+ protected Map<OFType, List<OpenFlowEventListener>> ofEventListener = Maps.newHashMap();
private final Controller ctrl = new Controller();
@@ -93,14 +107,14 @@
@Override
public void addListener(OpenFlowSwitchListener listener) {
- if (!ofEventListener.contains(listener)) {
- this.ofEventListener.add(listener);
+ if (!ofSwitchListener.contains(listener)) {
+ this.ofSwitchListener.add(listener);
}
}
@Override
public void removeListener(OpenFlowSwitchListener listener) {
- this.ofEventListener.remove(listener);
+ this.ofSwitchListener.remove(listener);
}
@Override
@@ -122,7 +136,7 @@
public void processPacket(Dpid dpid, OFMessage msg) {
switch (msg.getType()) {
case PORT_STATUS:
- for (OpenFlowSwitchListener l : ofEventListener) {
+ for (OpenFlowSwitchListener l : ofSwitchListener) {
l.portChanged(dpid, (OFPortStatus) msg);
}
break;
@@ -134,6 +148,12 @@
p.handlePacket(pktCtx);
}
break;
+ case FLOW_REMOVED:
+ case ERROR:
+ case STATS_REPLY:
+ case BARRIER_REPLY:
+ executor.submit(new OFMessageHandler(dpid, msg));
+ break;
default:
log.warn("Handling message type {} not yet implemented {}",
msg.getType(), msg);
@@ -164,7 +184,7 @@
} else {
log.error("Added switch {}", dpid);
connectedSwitches.put(dpid, sw);
- for (OpenFlowSwitchListener l : ofEventListener) {
+ for (OpenFlowSwitchListener l : ofSwitchListener) {
l.switchAdded(dpid);
}
return true;
@@ -277,7 +297,7 @@
if (sw == null) {
sw = activeEqualSwitches.remove(dpid);
}
- for (OpenFlowSwitchListener l : ofEventListener) {
+ for (OpenFlowSwitchListener l : ofSwitchListener) {
l.switchRemoved(dpid);
}
}
@@ -288,5 +308,26 @@
}
}
+ private final class OFMessageHandler implements Runnable {
+
+ private final OFMessage msg;
+ private final Dpid dpid;
+
+ public OFMessageHandler(Dpid dpid, OFMessage msg) {
+ this.msg = msg;
+ this.dpid = dpid;
+ }
+
+ @Override
+ public void run() {
+ List<OpenFlowEventListener> listeners =
+ ofEventListener.get(OFType.FLOW_REMOVED);
+ for (OpenFlowEventListener listener : listeners) {
+ listener.handleMessage(dpid, msg);
+ }
+ }
+
+ }
+
}