Moving Openflow executors from submit to execute
Change-Id: I446747c7b28d2562ff14afe7e898cab8a83a14b7
diff --git a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
index 182935a..3a999a2 100644
--- a/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
+++ b/protocols/openflow/api/src/main/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitch.java
@@ -95,7 +95,7 @@
protected Set<OpenFlowEventListener> ofOutgoingMsgListener = new CopyOnWriteArraySet<>();
protected ExecutorService executorMsgs =
- Executors.newCachedThreadPool(groupedThreads("onos/of", "event-outgoing-msg-stats-%d"));
+ Executors.newCachedThreadPool(groupedThreads("onos/of", "event-outgoing-msg-stats-%d", log));
// messagesPendingMastership is used as synchronization variable for
// all mastership related changes. In this block, mastership (including
@@ -173,7 +173,7 @@
if (m.getType() == OFType.PACKET_OUT ||
m.getType() == OFType.FLOW_MOD ||
m.getType() == OFType.STATS_REQUEST) {
- executorMsgs.submit(new OFMessageHandler(dpid, m));
+ executorMsgs.execute(new OFMessageHandler(dpid, m));
}
});
}
diff --git a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitchTest.java b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitchTest.java
index 288f141..fb8d977 100644
--- a/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitchTest.java
+++ b/protocols/openflow/api/src/test/java/org/onosproject/openflow/controller/driver/AbstractOpenFlowSwitchTest.java
@@ -15,15 +15,15 @@
*/
package org.onosproject.openflow.controller.driver;
+import org.jboss.netty.channel.Channel;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowEventListener;
import org.projectfloodlight.openflow.protocol.OFMessage;
-import org.jboss.netty.channel.Channel;
+
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Future;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
@@ -48,11 +48,10 @@
}
@Override
- public Future<?> submit(Runnable task) {
+ public void execute(Runnable task) {
AbstractOpenFlowSwitch.OFMessageHandler handler =
(AbstractOpenFlowSwitch.OFMessageHandler) task;
submittedMessages.add(handler.msg);
- return null;
}
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index eb90c26..84d2b9f 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -113,16 +113,16 @@
private int workerThreads = DEFAULT_WORKER_THREADS;
protected ExecutorService executorMsgs =
- Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d"));
+ Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d", log));
protected ExecutorService executorPacketIn =
- Executors.newCachedThreadPool(groupedThreads("onos/of", "event-pkt-in-stats-%d"));
+ Executors.newCachedThreadPool(groupedThreads("onos/of", "event-pkt-in-stats-%d", log));
protected ExecutorService executorFlowRemoved =
- Executors.newCachedThreadPool(groupedThreads("onos/of", "event-flow-removed-stats-%d"));
+ Executors.newCachedThreadPool(groupedThreads("onos/of", "event-flow-removed-stats-%d", log));
private final ExecutorService executorBarrier =
- Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
+ Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d", log));
protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches =
new ConcurrentHashMap<>();
@@ -286,19 +286,19 @@
p.handlePacket(pktCtx);
}
if (monitorAllEvents) {
- executorPacketIn.submit(new OFMessageHandler(dpid, msg));
+ executorPacketIn.execute(new OFMessageHandler(dpid, msg));
}
break;
// TODO: Consider using separate threadpool for sensitive messages.
// ie. Back to back error could cause us to starve.
case FLOW_REMOVED:
if (monitorAllEvents) {
- executorFlowRemoved.submit(new OFMessageHandler(dpid, msg));
+ executorFlowRemoved.execute(new OFMessageHandler(dpid, msg));
break;
}
case ERROR:
log.debug("Received error message from {}: {}", dpid, msg);
- executorMsgs.submit(new OFMessageHandler(dpid, msg));
+ executorMsgs.execute(new OFMessageHandler(dpid, msg));
break;
case STATS_REPLY:
OFStatsReply reply = (OFStatsReply) msg;
@@ -315,7 +315,7 @@
OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
rep.setEntries(Lists.newLinkedList(flowStats));
rep.setXid(reply.getXid());
- executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
+ executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
break;
case TABLE:
@@ -324,7 +324,7 @@
OFTableStatsReply.Builder rep =
OFFactories.getFactory(msg.getVersion()).buildTableStatsReply();
rep.setEntries(Lists.newLinkedList(tableStats));
- executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
+ executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
break;
case GROUP:
@@ -334,7 +334,7 @@
OFFactories.getFactory(msg.getVersion()).buildGroupStatsReply();
rep.setEntries(Lists.newLinkedList(groupStats));
rep.setXid(reply.getXid());
- executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
+ executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
break;
case GROUP_DESC:
@@ -345,14 +345,14 @@
OFFactories.getFactory(msg.getVersion()).buildGroupDescStatsReply();
rep.setEntries(Lists.newLinkedList(groupDescStats));
rep.setXid(reply.getXid());
- executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
+ executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
break;
case PORT:
- executorMsgs.submit(new OFMessageHandler(dpid, reply));
+ executorMsgs.execute(new OFMessageHandler(dpid, reply));
break;
case METER:
- executorMsgs.submit(new OFMessageHandler(dpid, reply));
+ executorMsgs.execute(new OFMessageHandler(dpid, reply));
break;
case EXPERIMENTER:
if (reply instanceof OFCalientFlowStatsReply) {
@@ -394,10 +394,10 @@
OFFlowStatsReply.Builder rep =
OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
rep.setEntries(Lists.newLinkedList(flowStats));
- executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
+ executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
} else {
- executorMsgs.submit(new OFMessageHandler(dpid, reply));
+ executorMsgs.execute(new OFMessageHandler(dpid, reply));
}
break;
default:
@@ -406,7 +406,7 @@
}
break;
case BARRIER_REPLY:
- executorBarrier.submit(new OFMessageHandler(dpid, msg));
+ executorBarrier.execute(new OFMessageHandler(dpid, msg));
break;
case EXPERIMENTER:
long experimenter = ((OFExperimenter) msg).getExperimenter();
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
index 3fef8dd..9f713ee 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImplPacketsTest.java
@@ -15,21 +15,15 @@
*/
package org.onosproject.openflow.controller.impl;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Future;
-
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openflow.ExecutorServiceAdapter;
-import org.onosproject.openflow.MockOfPortStatus;
-import org.onosproject.openflow.OpenFlowSwitchListenerAdapter;
-import org.onosproject.openflow.OpenflowSwitchDriverAdapter;
import org.onosproject.openflow.MockOfFeaturesReply;
import org.onosproject.openflow.MockOfPacketIn;
+import org.onosproject.openflow.MockOfPortStatus;
import org.onosproject.openflow.OfMessageAdapter;
+import org.onosproject.openflow.OpenFlowSwitchListenerAdapter;
+import org.onosproject.openflow.OpenflowSwitchDriverAdapter;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowPacketContext;
import org.onosproject.openflow.controller.OpenFlowSwitch;
@@ -37,11 +31,14 @@
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFType;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
import static junit.framework.TestCase.fail;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.*;
/**
* Tests for packet processing in the open flow controller impl class.
@@ -85,11 +82,10 @@
}
@Override
- public Future<?> submit(Runnable task) {
+ public void execute(Runnable task) {
OpenFlowControllerImpl.OFMessageHandler handler =
(OpenFlowControllerImpl.OFMessageHandler) task;
submittedMessages.add(handler.msg);
- return null;
}
}