Add support for timing out idle FPM connections.

This requires support for sending keepalives on the Quagga side.

Change-Id: I8551eb17b51460f2a66a7086b5c8006cc0d214a5
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 01321c1..2ff2874 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -34,6 +34,8 @@
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.timeout.IdleStateHandler;
+import org.jboss.netty.util.HashedWheelTimer;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.util.KryoNamespace;
@@ -84,6 +86,7 @@
 
     private static final int FPM_PORT = 2620;
     private static final String APP_NAME = "org.onosproject.fpm";
+    private static final int IDLE_TIMEOUT_SECS = 5;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
@@ -163,19 +166,24 @@
     }
 
     private void startServer() {
+        HashedWheelTimer timer = new HashedWheelTimer(
+                groupedThreads("onos/fpm", "fpm-timer-%d", log));
+
         ChannelFactory channelFactory = new NioServerSocketChannelFactory(
                 newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
                 newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
         ChannelPipelineFactory pipelineFactory = () -> {
             // Allocate a new session per connection
+            IdleStateHandler idleHandler =
+                    new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
             FpmSessionHandler fpmSessionHandler =
                     new FpmSessionHandler(new InternalFpmListener());
-            FpmFrameDecoder fpmFrameDecoder =
-                    new FpmFrameDecoder();
+            FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
 
             // Setup the processing pipeline
             ChannelPipeline pipeline = Channels.pipeline();
             pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
+            pipeline.addLast("idle", idleHandler);
             pipeline.addLast("FpmSession", fpmSessionHandler);
             return pipeline;
         };
@@ -210,6 +218,10 @@
     }
 
     private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
+        if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
+            return;
+        }
+
         Netlink netlink = fpmMessage.netlink();
         RtNetlink rtNetlink = netlink.rtNetlink();
 
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
index f6e982f..1a75914 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
@@ -21,7 +21,9 @@
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
+import org.jboss.netty.handler.timeout.IdleStateEvent;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
 import org.onosproject.routing.fpm.protocol.FpmHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +36,7 @@
 /**
  * Session handler for FPM protocol.
  */
-public class FpmSessionHandler extends SimpleChannelHandler {
+public class FpmSessionHandler extends IdleStateAwareChannelHandler {
 
     private static Logger log = LoggerFactory.getLogger(FpmSessionHandler.class);
 
@@ -43,6 +45,9 @@
     private Channel channel;
     private FpmPeer us;
 
+    private boolean useKeepalives;
+    private boolean initialized;
+
     /**
      * Class constructor.
      *
@@ -56,13 +61,35 @@
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
             throws Exception {
         FpmHeader fpmMessage = (FpmHeader) e.getMessage();
+
+        initConnection(ctx, fpmMessage);
+
         fpmListener.fpmMessage(us, fpmMessage);
     }
 
+    private void initConnection(ChannelHandlerContext ctx, FpmHeader message) {
+        if (!initialized) {
+            useKeepalives = message.version() >= FpmHeader.FPM_VERSION_ONOS_EXT;
+            if (useKeepalives) {
+                log.info("Using keepalives");
+            } else {
+                log.info("Not using keepalives");
+                // Remove the idle channel handler if using a protocol version
+                // with no keepalive messages
+                ctx.getPipeline().remove("idle");
+            }
+            initialized = true;
+        }
+    }
+
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
             throws Exception {
-        log.error("Exception thrown while handling FPM message", e.getCause());
+        if (e.getCause() instanceof ReadTimeoutException) {
+            log.warn("Haven't heard from FPM client for a while");
+        } else {
+            log.error("Exception thrown while handling FPM message", e.getCause());
+        }
         if (channel != null) {
             channel.close();
         }
@@ -111,4 +138,14 @@
         }
         channel = null;
     }
+
+    @Override
+    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
+            throws Exception {
+        log.warn("FPM channel idle");
+        if (useKeepalives) {
+            ctx.getChannel().close();
+        }
+        super.channelIdle(ctx, e);
+    }
 }
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
index 5bba9fb..85ee9bc 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
@@ -17,6 +17,7 @@
 package org.onosproject.routing.fpm.protocol;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
 import org.onlab.packet.DeserializationException;
 
 import java.nio.ByteBuffer;
@@ -30,7 +31,17 @@
     public static final int FPM_HEADER_LENGTH = 4;
 
     public static final short FPM_VERSION_1 = 1;
+    public static final short FPM_VERSION_ONOS_EXT = 32;
+
+    private static final ImmutableSet<Short> SUPPORTED_VERSIONS =
+            ImmutableSet.<Short>builder()
+            .add(FPM_VERSION_1)
+            .add(FPM_VERSION_ONOS_EXT)
+            .build();
+
     public static final short FPM_TYPE_NETLINK = 1;
+    public static final short FPM_TYPE_PROTOBUF = 2;
+    public static final short FPM_TYPE_KEEPALIVE = 32;
 
     private static final String VERSION_NOT_SUPPORTED = "FPM version not supported: ";
     private static final String TYPE_NOT_SUPPORTED = "FPM type not supported: ";
@@ -119,17 +130,21 @@
         ByteBuffer bb = ByteBuffer.wrap(buffer, start, length);
 
         short version = bb.get();
-        if (version != FPM_VERSION_1) {
+        if (!SUPPORTED_VERSIONS.contains(version)) {
             throw new DeserializationException(VERSION_NOT_SUPPORTED + version);
         }
 
         short type = bb.get();
+        int messageLength = bb.getShort();
+
+        if (type == FPM_TYPE_KEEPALIVE) {
+            return new FpmHeader(version, type, messageLength, null);
+        }
+
         if (type != FPM_TYPE_NETLINK) {
             throw new DeserializationException(TYPE_NOT_SUPPORTED + type);
         }
 
-        int messageLength = bb.getShort();
-
         Netlink netlink = Netlink.decode(buffer, bb.position(), bb.limit() - bb.position());
 
         return new FpmHeader(version, type, messageLength, netlink);