Avoids disconnection from the switches
- Restarting the dispatcher if the queue is full
- Restarting the dispatcher during idle events
- Checking, during idle events, if the backlog can be drained
- Reducing the interval of idle events
Does not change the behavior of the state machine
Change-Id: I1721d8fad37e4e833d0fdfd12d51dc51a06559d0
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index b2fc126..0eb2bec 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -152,8 +152,12 @@
*/
private int handshakeTransactionIds = -1;
+ // Each IDLE_INTERVAL (see OFChannelInitializer) we will perform a sanity check
+ // Idle timeout actions will be performed each MAX_IDLE_RETRY * IDLE_INTERVAL
+ private static final int MAX_IDLE_RETRY = 4;
+ private int maxIdleRetry = MAX_IDLE_RETRY;
-
+ // Dispatcher buffer/read size
private static final int MSG_READ_BUFFER = 5000;
/**
@@ -1366,19 +1370,44 @@
return getSwitchInfoString();
}
+ // We have reduced the idle period, the idea is to use
+ // the IdleHandler to perform also some sanity checks.
+ // Previous code is still executed with the same frequency
+ // which is IDLE_INTERVAL * MAX_IDLE_RETRY of inactivity
private void channelIdle(ChannelHandlerContext ctx,
IdleStateEvent e)
throws IOException {
- // Factory can be null if the channel goes idle during initial handshake. Since the switch
- // is not even initialized properly, we just skip this and disconnect the channel.
- if (factory != null) {
- OFMessage m = factory.buildEchoRequest().build();
- log.debug("Sending Echo Request on idle channel: {}", ctx.channel());
- ctx.writeAndFlush(Collections.singletonList(m), ctx.voidPromise());
- // XXX S some problems here -- echo request has no transaction id, and
- // echo reply is not correlated to the echo request.
+ // dispatcher terminated for some reason, restart
+ if (dispatcherHandle.isDone()) {
+ dispatcherHandle = dispatcher.submit(new Dispatcher());
}
- state.processIdle(this);
+ // drain the backlog
+ while (!dispatchBacklog.isEmpty()) {
+ OFMessage msg = dispatchBacklog.pop();
+ // move packet to the dispatcher queue
+ if (!dispatchQueue.offer(msg)) {
+ // queue full
+ channel.config().setAutoRead(false);
+ // put it back to the head of backlog
+ dispatchBacklog.addFirst(msg);
+ break;
+ }
+ }
+ // Original timeout reached
+ if (--maxIdleRetry == 0) {
+ maxIdleRetry = MAX_IDLE_RETRY;
+ // Factory can be null if the channel goes idle during initial handshake. Since the switch
+ // is not even initialized properly, we just skip this and disconnect the channel.
+ if (factory != null) {
+ // send an echo request each time idle_timeout * TICK
+ OFMessage m = factory.buildEchoRequest().build();
+ log.info("Sending Echo Request on idle channel: {}", ctx.channel());
+ // XXX S some problems here -- echo request has no transaction id, and
+ // echo reply is not correlated to the echo request.
+ ctx.writeAndFlush(Collections.singletonList(m), ctx.voidPromise());
+ }
+ state.processIdle(this);
+ }
}
@Override
@@ -1387,7 +1416,7 @@
throws Exception {
// If the connection is READER/WRITER idle try to send an echo request
if (evt instanceof IdleStateEvent) {
- log.info("Channel {} is {}", ctx.channel(), ((IdleStateEvent) evt).state());
+ log.debug("Channel {} is {}", ctx.channel(), ((IdleStateEvent) evt).state());
channelIdle(ctx, (IdleStateEvent) evt);
} else {
super.userEventTriggered(ctx, evt);
@@ -1400,6 +1429,7 @@
Object msg) throws Exception {
boolean release = true;
+ maxIdleRetry = MAX_IDLE_RETRY;
try {
if (msg instanceof OFMessage) {
// channelRead0 inlined
@@ -1431,62 +1461,73 @@
}
private void dispatchMessage(OFMessage m) {
-
+ // backlog is empty
if (dispatchBacklog.isEmpty()) {
if (!dispatchQueue.offer(m)) {
// queue full
channel.config().setAutoRead(false);
// put it on the head of backlog
dispatchBacklog.addFirst(m);
+ // dispatcher terminated for some reason, restart
+ if (dispatcherHandle.isDone()) {
+ dispatcherHandle = dispatcher.submit(new Dispatcher());
+ }
return;
}
} else {
dispatchBacklog.addLast(m);
}
-
+ // drain the backlog
while (!dispatchBacklog.isEmpty()) {
OFMessage msg = dispatchBacklog.pop();
-
+ // move packet to the dispatcher queue
if (!dispatchQueue.offer(msg)) {
// queue full
channel.config().setAutoRead(false);
// put it back to the head of backlog
dispatchBacklog.addFirst(msg);
+ // dispatcher terminated for some reason, restart
+ if (dispatcherHandle.isDone()) {
+ dispatcherHandle = dispatcher.submit(new Dispatcher());
+ }
return;
}
}
-
-
+ // dispatcher terminated for some reason, restart
if (dispatcherHandle.isDone()) {
- // dispatcher terminated for some reason, restart
+ dispatcherHandle = dispatcher.submit(new Dispatcher());
+ }
+ }
- dispatcherHandle = dispatcher.submit((Runnable) () -> {
- try {
- List<OFMessage> msgs = new ArrayList<>();
- for (;;) {
- // wait for new message
- OFMessage msg = dispatchQueue.take();
- sw.handleMessage(msg);
+ private final class Dispatcher implements Runnable {
+ // dispatch loop
+ @Override
+ public void run() {
+ try {
+ List<OFMessage> msgs = new ArrayList<>();
+ for (;;) {
+ // wait for new message
+ OFMessage msg = dispatchQueue.take();
+ sw.handleMessage(msg);
- while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
- if (!channel.config().isAutoRead()) {
- channel.config().setAutoRead(true);
- }
- msgs.forEach(sw::handleMessage);
- msgs.clear();
- }
-
+ while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
if (!channel.config().isAutoRead()) {
channel.config().setAutoRead(true);
}
+ msgs.forEach(sw::handleMessage);
+ msgs.clear();
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // interrupted. gracefully shutting down
- return;
- }
- });
+ if (!channel.config().isAutoRead()) {
+ channel.config().setAutoRead(true);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // interrupted. gracefully shutting down
+ log.warn("Dispatcher thread InterruptedException:", e);
+
+ }
}
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
index b03f007..e636835 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
@@ -76,7 +76,7 @@
pipeline.addLast("ofmessageencoder", OFMessageEncoder.getInstance());
pipeline.addLast("ofmessagedecoder", OFMessageDecoder.getInstance());
- pipeline.addLast("idle", new IdleStateHandler(20, 25, 0));
+ pipeline.addLast("idle", new IdleStateHandler(5, 25, 0));
pipeline.addLast("timeout", new ReadTimeoutHandler(30));
// XXX S ONOS: was 15 increased it to fix Issue #296