Add support for timing out idle FPM connections.
This requires support for sending keepalives on the Quagga side.
Change-Id: I8551eb17b51460f2a66a7086b5c8006cc0d214a5
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
index 01321c1..2ff2874 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmManager.java
@@ -34,6 +34,8 @@
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.timeout.IdleStateHandler;
+import org.jboss.netty.util.HashedWheelTimer;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoNamespace;
@@ -84,6 +86,7 @@
private static final int FPM_PORT = 2620;
private static final String APP_NAME = "org.onosproject.fpm";
+ private static final int IDLE_TIMEOUT_SECS = 5;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@@ -163,19 +166,24 @@
}
private void startServer() {
+ HashedWheelTimer timer = new HashedWheelTimer(
+ groupedThreads("onos/fpm", "fpm-timer-%d", log));
+
ChannelFactory channelFactory = new NioServerSocketChannelFactory(
newCachedThreadPool(groupedThreads("onos/fpm", "sm-boss-%d", log)),
newCachedThreadPool(groupedThreads("onos/fpm", "sm-worker-%d", log)));
ChannelPipelineFactory pipelineFactory = () -> {
// Allocate a new session per connection
+ IdleStateHandler idleHandler =
+ new IdleStateHandler(timer, IDLE_TIMEOUT_SECS, 0, 0);
FpmSessionHandler fpmSessionHandler =
new FpmSessionHandler(new InternalFpmListener());
- FpmFrameDecoder fpmFrameDecoder =
- new FpmFrameDecoder();
+ FpmFrameDecoder fpmFrameDecoder = new FpmFrameDecoder();
// Setup the processing pipeline
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("FpmFrameDecoder", fpmFrameDecoder);
+ pipeline.addLast("idle", idleHandler);
pipeline.addLast("FpmSession", fpmSessionHandler);
return pipeline;
};
@@ -210,6 +218,10 @@
}
private void fpmMessage(FpmPeer peer, FpmHeader fpmMessage) {
+ if (fpmMessage.type() == FpmHeader.FPM_TYPE_KEEPALIVE) {
+ return;
+ }
+
Netlink netlink = fpmMessage.netlink();
RtNetlink rtNetlink = netlink.rtNetlink();
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
index f6e982f..1a75914 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/FpmSessionHandler.java
@@ -21,7 +21,9 @@
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
+import org.jboss.netty.handler.timeout.IdleStateEvent;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.onosproject.routing.fpm.protocol.FpmHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +36,7 @@
/**
* Session handler for FPM protocol.
*/
-public class FpmSessionHandler extends SimpleChannelHandler {
+public class FpmSessionHandler extends IdleStateAwareChannelHandler {
private static Logger log = LoggerFactory.getLogger(FpmSessionHandler.class);
@@ -43,6 +45,9 @@
private Channel channel;
private FpmPeer us;
+ private boolean useKeepalives;
+ private boolean initialized;
+
/**
* Class constructor.
*
@@ -56,13 +61,35 @@
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
FpmHeader fpmMessage = (FpmHeader) e.getMessage();
+
+ initConnection(ctx, fpmMessage);
+
fpmListener.fpmMessage(us, fpmMessage);
}
+ private void initConnection(ChannelHandlerContext ctx, FpmHeader message) {
+ if (!initialized) {
+ useKeepalives = message.version() >= FpmHeader.FPM_VERSION_ONOS_EXT;
+ if (useKeepalives) {
+ log.info("Using keepalives");
+ } else {
+ log.info("Not using keepalives");
+ // Remove the idle channel handler if using a protocol version
+ // with no keepalive messages
+ ctx.getPipeline().remove("idle");
+ }
+ initialized = true;
+ }
+ }
+
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
- log.error("Exception thrown while handling FPM message", e.getCause());
+ if (e.getCause() instanceof ReadTimeoutException) {
+ log.warn("Haven't heard from FPM client for a while");
+ } else {
+ log.error("Exception thrown while handling FPM message", e.getCause());
+ }
if (channel != null) {
channel.close();
}
@@ -111,4 +138,14 @@
}
channel = null;
}
+
+ @Override
+ public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
+ throws Exception {
+ log.warn("FPM channel idle");
+ if (useKeepalives) {
+ ctx.getChannel().close();
+ }
+ super.channelIdle(ctx, e);
+ }
}
diff --git a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
index 5bba9fb..85ee9bc 100644
--- a/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
+++ b/apps/routing/fpm/src/main/java/org/onosproject/routing/fpm/protocol/FpmHeader.java
@@ -17,6 +17,7 @@
package org.onosproject.routing.fpm.protocol;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
import org.onlab.packet.DeserializationException;
import java.nio.ByteBuffer;
@@ -30,7 +31,17 @@
public static final int FPM_HEADER_LENGTH = 4;
public static final short FPM_VERSION_1 = 1;
+ public static final short FPM_VERSION_ONOS_EXT = 32;
+
+ private static final ImmutableSet<Short> SUPPORTED_VERSIONS =
+ ImmutableSet.<Short>builder()
+ .add(FPM_VERSION_1)
+ .add(FPM_VERSION_ONOS_EXT)
+ .build();
+
public static final short FPM_TYPE_NETLINK = 1;
+ public static final short FPM_TYPE_PROTOBUF = 2;
+ public static final short FPM_TYPE_KEEPALIVE = 32;
private static final String VERSION_NOT_SUPPORTED = "FPM version not supported: ";
private static final String TYPE_NOT_SUPPORTED = "FPM type not supported: ";
@@ -119,17 +130,21 @@
ByteBuffer bb = ByteBuffer.wrap(buffer, start, length);
short version = bb.get();
- if (version != FPM_VERSION_1) {
+ if (!SUPPORTED_VERSIONS.contains(version)) {
throw new DeserializationException(VERSION_NOT_SUPPORTED + version);
}
short type = bb.get();
+ int messageLength = bb.getShort();
+
+ if (type == FPM_TYPE_KEEPALIVE) {
+ return new FpmHeader(version, type, messageLength, null);
+ }
+
if (type != FPM_TYPE_NETLINK) {
throw new DeserializationException(TYPE_NOT_SUPPORTED + type);
}
- int messageLength = bb.getShort();
-
Netlink netlink = Netlink.decode(buffer, bb.position(), bb.limit() - bb.position());
return new FpmHeader(version, type, messageLength, netlink);