[ONOS-6358] Further improve LISP control message processing speed
Change-Id: Idf244a6fcda59bd8ac92d96d08066b2520b210af
diff --git a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispChannelHandler.java b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispChannelHandler.java
index 8efbff9..ac72d57 100644
--- a/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispChannelHandler.java
+++ b/protocols/lisp/ctl/src/main/java/org/onosproject/lisp/ctl/impl/LispChannelHandler.java
@@ -34,8 +34,11 @@
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.onlab.packet.IpAddress.valueOf;
+import static org.onlab.util.Tools.groupedThreads;
/**
* Channel handler deals with the xTR connection and dispatches xTR messages
@@ -47,58 +50,13 @@
private final LispRouterFactory routerFactory = LispRouterFactory.getInstance();
- private LispRouter router;
+ protected ExecutorService executorMessages =
+ Executors.newFixedThreadPool(32, groupedThreads("onos/lisp", "message-stats-%d", log));
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- try {
-
- // process map-request message that is encapsulated in ECM
- if (msg instanceof LispEncapsulatedControl) {
- LispMessage innerMsg = extractLispMessage((LispEncapsulatedControl) msg);
- if (innerMsg instanceof LispMapRequest) {
- LispMapResolver mapResolver = LispMapResolver.getInstance();
- List<LispMessage> lispMessages =
- mapResolver.processMapRequest((LispEncapsulatedControl) msg);
-
- if (lispMessages != null) {
- lispMessages.forEach(ctx::writeAndFlush);
- }
- }
- }
-
- // process map-register message
- if (msg instanceof LispMapRegister) {
-
- LispMapRegister register = (LispMapRegister) msg;
- IpAddress xtrAddress = valueOf(register.getSender().getAddress());
- router = routerFactory.getRouterInstance(xtrAddress);
- router.setChannel(ctx.channel());
- router.connectRouter();
- router.handleMessage(register);
-
- LispMapServer mapServer = LispMapServer.getInstance();
- LispMapNotify mapNotify = mapServer.processMapRegister(register);
-
- if (mapNotify != null) {
- ctx.writeAndFlush(mapNotify);
- }
- }
-
- // process info-request message
- if (msg instanceof LispInfoRequest) {
- LispMapServer mapServer = LispMapServer.getInstance();
- LispInfoReply infoReply = mapServer.processInfoRequest((LispInfoRequest) msg);
-
- if (infoReply != null) {
- ctx.writeAndFlush(infoReply);
- }
- }
- } finally {
- // try to remove the received message form the buffer
- ReferenceCountUtil.release(msg);
- }
+ executorMessages.execute(new LispMessageHandler(ctx, (LispMessage) msg));
}
@Override
@@ -139,4 +97,70 @@
message.configSender(ecm.getSender());
return message;
}
+
+ /**
+ * LISP message handler.
+ */
+ private final class LispMessageHandler implements Runnable {
+
+ protected final ChannelHandlerContext ctx;
+ protected final LispMessage msg;
+ private LispRouter router;
+
+ public LispMessageHandler(ChannelHandlerContext ctx, LispMessage msg) {
+ this.ctx = ctx;
+ this.msg = msg;
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ // process map-request message that is encapsulated in ECM
+ if (msg instanceof LispEncapsulatedControl) {
+ LispMessage innerMsg = extractLispMessage((LispEncapsulatedControl) msg);
+ if (innerMsg instanceof LispMapRequest) {
+ LispMapResolver mapResolver = LispMapResolver.getInstance();
+ List<LispMessage> lispMessages =
+ mapResolver.processMapRequest(msg);
+
+ if (lispMessages != null) {
+ lispMessages.forEach(ctx::writeAndFlush);
+ }
+ }
+ }
+
+ // process map-register message
+ if (msg instanceof LispMapRegister) {
+
+ LispMapRegister register = (LispMapRegister) msg;
+ IpAddress xtrAddress = valueOf(register.getSender().getAddress());
+ router = routerFactory.getRouterInstance(xtrAddress);
+ router.setChannel(ctx.channel());
+ router.connectRouter();
+ router.handleMessage(register);
+
+ LispMapServer mapServer = LispMapServer.getInstance();
+ LispMapNotify mapNotify = mapServer.processMapRegister(register);
+
+ if (mapNotify != null) {
+ ctx.writeAndFlush(mapNotify);
+ }
+ }
+
+ // process info-request message
+ if (msg instanceof LispInfoRequest) {
+ LispMapServer mapServer = LispMapServer.getInstance();
+ LispInfoReply infoReply = mapServer.processInfoRequest(msg);
+
+ if (infoReply != null) {
+ ctx.writeAndFlush(infoReply);
+ }
+ }
+ } finally {
+ // try to remove the received message form the buffer
+ ReferenceCountUtil.release(msg);
+ }
+ }
+ }
}