Avoids disconnection from the switches

- Restarting the dispatcher if the queue is full
- Restarting the dispatcher during idle events
- Checking, during idle events, if the backlog can be drained
- Reducing the interval of idle events

Does not change the behavior of the state machine

Change-Id: I1721d8fad37e4e833d0fdfd12d51dc51a06559d0
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index b2fc126..0eb2bec 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -152,8 +152,12 @@
      */
     private int handshakeTransactionIds = -1;
 
+    // Each IDLE_INTERVAL (see OFChannelInitializer) we will perform a sanity check
+    // Idle timeout actions will be performed each MAX_IDLE_RETRY * IDLE_INTERVAL
+    private static final int MAX_IDLE_RETRY = 4;
+    private int maxIdleRetry = MAX_IDLE_RETRY;
 
-
+    // Dispatcher buffer/read size
     private static final int MSG_READ_BUFFER = 5000;
 
     /**
@@ -1366,19 +1370,44 @@
         return getSwitchInfoString();
     }
 
+    // We have reduced the idle period, the idea is to use
+    // the IdleHandler to perform also some sanity checks.
+    // Previous code is still executed with the same frequency
+    // which is IDLE_INTERVAL * MAX_IDLE_RETRY of inactivity
     private void channelIdle(ChannelHandlerContext ctx,
                                IdleStateEvent e)
             throws IOException {
-        // Factory can be null if the channel goes idle during initial handshake. Since the switch
-        // is not even initialized properly, we just skip this and disconnect the channel.
-        if (factory != null) {
-            OFMessage m = factory.buildEchoRequest().build();
-            log.debug("Sending Echo Request on idle channel: {}", ctx.channel());
-            ctx.writeAndFlush(Collections.singletonList(m), ctx.voidPromise());
-            // XXX S some problems here -- echo request has no transaction id, and
-            // echo reply is not correlated to the echo request.
+        // dispatcher terminated for some reason, restart
+        if (dispatcherHandle.isDone()) {
+            dispatcherHandle = dispatcher.submit(new Dispatcher());
         }
-        state.processIdle(this);
+        // drain the backlog
+        while (!dispatchBacklog.isEmpty()) {
+            OFMessage msg = dispatchBacklog.pop();
+            // move packet to the dispatcher queue
+            if (!dispatchQueue.offer(msg)) {
+                // queue full
+                channel.config().setAutoRead(false);
+                // put it back to the head of backlog
+                dispatchBacklog.addFirst(msg);
+                break;
+            }
+        }
+        // Original timeout reached
+        if (--maxIdleRetry == 0) {
+            maxIdleRetry = MAX_IDLE_RETRY;
+            // Factory can be null if the channel goes idle during initial handshake. Since the switch
+            // is not even initialized properly, we just skip this and disconnect the channel.
+            if (factory != null) {
+                // send an echo request each time idle_timeout * TICK
+                OFMessage m = factory.buildEchoRequest().build();
+                log.info("Sending Echo Request on idle channel: {}", ctx.channel());
+                // XXX S some problems here -- echo request has no transaction id, and
+                // echo reply is not correlated to the echo request.
+                ctx.writeAndFlush(Collections.singletonList(m), ctx.voidPromise());
+            }
+            state.processIdle(this);
+        }
     }
 
     @Override
@@ -1387,7 +1416,7 @@
             throws Exception {
         // If the connection is READER/WRITER idle try to send an echo request
         if (evt instanceof IdleStateEvent) {
-            log.info("Channel {} is {}", ctx.channel(), ((IdleStateEvent) evt).state());
+            log.debug("Channel {} is {}", ctx.channel(), ((IdleStateEvent) evt).state());
             channelIdle(ctx, (IdleStateEvent) evt);
         } else {
             super.userEventTriggered(ctx, evt);
@@ -1400,6 +1429,7 @@
                             Object msg) throws Exception {
 
         boolean release = true;
+        maxIdleRetry = MAX_IDLE_RETRY;
         try {
             if (msg instanceof OFMessage) {
                 // channelRead0 inlined
@@ -1431,62 +1461,73 @@
     }
 
     private void dispatchMessage(OFMessage m) {
-
+        // backlog is empty
         if (dispatchBacklog.isEmpty()) {
             if (!dispatchQueue.offer(m)) {
                 // queue full
                 channel.config().setAutoRead(false);
                 // put it on the head of backlog
                 dispatchBacklog.addFirst(m);
+                // dispatcher terminated for some reason, restart
+                if (dispatcherHandle.isDone()) {
+                    dispatcherHandle = dispatcher.submit(new Dispatcher());
+                }
                 return;
             }
         } else {
             dispatchBacklog.addLast(m);
         }
-
+        // drain the backlog
         while (!dispatchBacklog.isEmpty()) {
             OFMessage msg = dispatchBacklog.pop();
-
+            // move packet to the dispatcher queue
             if (!dispatchQueue.offer(msg)) {
                 // queue full
                 channel.config().setAutoRead(false);
                 // put it back to the head of backlog
                 dispatchBacklog.addFirst(msg);
+                // dispatcher terminated for some reason, restart
+                if (dispatcherHandle.isDone()) {
+                    dispatcherHandle = dispatcher.submit(new Dispatcher());
+                }
                 return;
             }
         }
-
-
+        // dispatcher terminated for some reason, restart
         if (dispatcherHandle.isDone()) {
-            // dispatcher terminated for some reason, restart
+            dispatcherHandle = dispatcher.submit(new Dispatcher());
+        }
+    }
 
-            dispatcherHandle = dispatcher.submit((Runnable) () -> {
-                try {
-                    List<OFMessage> msgs = new ArrayList<>();
-                    for (;;) {
-                        // wait for new message
-                        OFMessage msg = dispatchQueue.take();
-                        sw.handleMessage(msg);
+    private final class Dispatcher implements Runnable {
+        // dispatch loop
+        @Override
+        public void run() {
+            try {
+                List<OFMessage> msgs = new ArrayList<>();
+                for (;;) {
+                    // wait for new message
+                    OFMessage msg = dispatchQueue.take();
+                    sw.handleMessage(msg);
 
-                        while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
-                            if (!channel.config().isAutoRead()) {
-                                channel.config().setAutoRead(true);
-                            }
-                            msgs.forEach(sw::handleMessage);
-                            msgs.clear();
-                        }
-
+                    while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
                         if (!channel.config().isAutoRead()) {
                             channel.config().setAutoRead(true);
                         }
+                        msgs.forEach(sw::handleMessage);
+                        msgs.clear();
                     }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    // interrupted. gracefully shutting down
-                    return;
-                }
 
-            });
+                    if (!channel.config().isAutoRead()) {
+                        channel.config().setAutoRead(true);
+                    }
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                // interrupted. gracefully shutting down
+                log.warn("Dispatcher thread InterruptedException:", e);
+
+            }
         }
     }
 
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
index b03f007..e636835 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
@@ -76,7 +76,7 @@
         pipeline.addLast("ofmessageencoder", OFMessageEncoder.getInstance());
         pipeline.addLast("ofmessagedecoder", OFMessageDecoder.getInstance());
 
-        pipeline.addLast("idle", new IdleStateHandler(20, 25, 0));
+        pipeline.addLast("idle", new IdleStateHandler(5, 25, 0));
         pipeline.addLast("timeout", new ReadTimeoutHandler(30));
 
         // XXX S ONOS: was 15 increased it to fix Issue #296